Implement observer runtime world synthesis engine
This commit is contained in:
parent
72c5a53610
commit
8f5b905015
63
docs/observer-runtime.md
Normal file
63
docs/observer-runtime.md
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
# Observer Runtime
|
||||
|
||||
The Observer Runtime is a lightweight agent responsible for synthesizing the operational world state of the homelab from raw events, logs, and state files.
|
||||
|
||||
## Architecture
|
||||
|
||||
The observer follows a filesystem-first approach, consuming append-only events and generating a normalized world model. It is designed to be idempotent, resumable, and resilient to intermittent node connectivity.
|
||||
|
||||
### Inputs
|
||||
- `/opt/homelab/events/`: Normalized JSON events.
|
||||
- `/opt/homelab/state/`: Deployment stage markers and internal observer checkpoint.
|
||||
- `/opt/homelab/logs/`: Detailed execution logs and diagnostics.
|
||||
- Repository Inventory: `inventory/topology.yaml` and `hosts/*/services.yaml`.
|
||||
|
||||
### World Model Output
|
||||
Generated under `/opt/homelab/world/`:
|
||||
- `nodes.json`: Current node availability, roles, and last seen timestamps.
|
||||
- `services.json`: Service health status and links to active incidents.
|
||||
- `deployments.json`: Tracking of active and historical deployment runs by `correlation_id`.
|
||||
- `incidents.json`: Correlated operational issues, including repeat failures and resolution status.
|
||||
- `runtime-summary.json`: High-level overview for dashboards and planner agents.
|
||||
|
||||
## Incident Lifecycle
|
||||
|
||||
The observer implements lightweight incident correlation:
|
||||
|
||||
1. **Detection**: When a `service_unhealthy` or `healthcheck_failed` event is consumed, a new incident is created or an existing active incident for that service is updated.
|
||||
2. **Correlation**: Multiple failure events for the same service on the same node are collapsed into a single incident, tracking the `occurrence_count`.
|
||||
3. **Diagnostics**: Deployment failures (`deployment_failed`) automatically attach references to diagnostic files if present in the event payload.
|
||||
4. **Resolution**: A `service_recovered` event for a service will transition any active incidents for that service to a `resolved` state.
|
||||
|
||||
### Example Incident JSON
|
||||
```json
|
||||
{
|
||||
"inc-1715518800-saturn-mosquitto": {
|
||||
"id": "inc-1715518800-saturn-mosquitto",
|
||||
"node": "saturn",
|
||||
"service": "mosquitto",
|
||||
"status": "resolved",
|
||||
"severity": "error",
|
||||
"started_at": "2026-05-12T12:05:00Z",
|
||||
"last_occurrence": "2026-05-12T12:06:00Z",
|
||||
"occurrence_count": 2,
|
||||
"events": [
|
||||
"2026-05-12T12:05:00Z",
|
||||
"2026-05-12T12:06:00Z"
|
||||
],
|
||||
"correlation_id": "hc-1",
|
||||
"resolved_at": "2026-05-12T12:10:00Z"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Runtime Behavior
|
||||
|
||||
### Idempotency
|
||||
The observer processes events in order. If the world state is lost, deleting the checkpoint file (`/opt/homelab/state/observer_checkpoint.json`) will cause the observer to re-process all events and rebuild the world state.
|
||||
|
||||
### Resumability
|
||||
The observer tracks the last processed event file in its checkpoint. Upon restart, it continues from the next available event.
|
||||
|
||||
### Deployment Tracking
|
||||
Deployments are tracked via `correlation_id`. The observer synthesizes the start, end, and status of each deployment run, providing a clear history of changes to the environment.
|
||||
308
scripts/observer/observer.py
Normal file
308
scripts/observer/observer.py
Normal file
|
|
@ -0,0 +1,308 @@
|
|||
import os
|
||||
import json
|
||||
import time
|
||||
import glob
|
||||
import logging
|
||||
import yaml
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# Constants and Paths
|
||||
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
|
||||
EVENTS_DIR = Path(RUNTIME_PATH) / "events"
|
||||
STATE_DIR = Path(RUNTIME_PATH) / "state"
|
||||
LOGS_DIR = Path(RUNTIME_PATH) / "logs"
|
||||
WORLD_DIR = Path(RUNTIME_PATH) / "world"
|
||||
OBSERVER_STATE_FILE = STATE_DIR / "observer_checkpoint.json"
|
||||
|
||||
REPO_ROOT = Path(__file__).parent.parent.parent
|
||||
INVENTORY_TOPOLOGY = REPO_ROOT / "inventory" / "topology.yaml"
|
||||
|
||||
# Logging setup
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger("observer")
|
||||
|
||||
class Observer:
|
||||
def __init__(self):
|
||||
self.last_processed_file = None
|
||||
self.world_state = {
|
||||
"nodes": {},
|
||||
"services": {},
|
||||
"deployments": {},
|
||||
"incidents": {},
|
||||
"summary": {
|
||||
"last_update": None,
|
||||
"status": "initializing",
|
||||
"active_incidents_count": 0
|
||||
}
|
||||
}
|
||||
self.inventory = self._load_inventory()
|
||||
self._ensure_dirs()
|
||||
self._load_checkpoint()
|
||||
|
||||
def _ensure_dirs(self):
|
||||
WORLD_DIR.mkdir(parents=True, exist_ok=True)
|
||||
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
LOGS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _load_inventory(self):
|
||||
inventory = {"nodes": {}, "services": {}}
|
||||
try:
|
||||
if INVENTORY_TOPOLOGY.exists():
|
||||
with open(INVENTORY_TOPOLOGY, "r") as f:
|
||||
topo = yaml.safe_load(f)
|
||||
for node_name, node_info in topo.get("nodes", {}).items():
|
||||
inventory["nodes"][node_name] = {
|
||||
"roles": node_info.get("roles", []),
|
||||
"connectivity": node_info.get("connectivity", {})
|
||||
}
|
||||
|
||||
# Load service assignments from hosts files
|
||||
hosts_dir = REPO_ROOT / "hosts"
|
||||
for host_dir in hosts_dir.iterdir():
|
||||
if host_dir.is_dir():
|
||||
svc_file = host_dir / "services.yaml"
|
||||
if svc_file.exists():
|
||||
with open(svc_file, "r") as f:
|
||||
svc_data = yaml.safe_load(f)
|
||||
host_name = svc_data.get("host")
|
||||
for svc_name, svc_info in svc_data.get("services", {}).items():
|
||||
if host_name not in inventory["services"]:
|
||||
inventory["services"][host_name] = {}
|
||||
inventory["services"][host_name][svc_name] = {
|
||||
"role": svc_info.get("role"),
|
||||
"exposure": svc_info.get("exposure")
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load inventory: {e}")
|
||||
return inventory
|
||||
|
||||
def _load_checkpoint(self):
|
||||
if OBSERVER_STATE_FILE.exists():
|
||||
try:
|
||||
with open(OBSERVER_STATE_FILE, "r") as f:
|
||||
checkpoint = json.load(f)
|
||||
self.last_processed_file = checkpoint.get("last_processed_file")
|
||||
# We might want to persist partial world state,
|
||||
# but for now we rebuild from events (idempotent)
|
||||
# or we can load existing world state files.
|
||||
self._load_world_from_disk()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load checkpoint: {e}")
|
||||
|
||||
def _load_world_from_disk(self):
|
||||
# Optional: Load existing state to resume faster
|
||||
files = {
|
||||
"nodes": WORLD_DIR / "nodes.json",
|
||||
"services": WORLD_DIR / "services.json",
|
||||
"deployments": WORLD_DIR / "deployments.json",
|
||||
"incidents": WORLD_DIR / "incidents.json",
|
||||
"summary": WORLD_DIR / "runtime-summary.json"
|
||||
}
|
||||
for key, path in files.items():
|
||||
if path.exists():
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
self.world_state[key] = json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load {key} state: {e}")
|
||||
|
||||
def _save_checkpoint(self):
|
||||
try:
|
||||
with open(OBSERVER_STATE_FILE, "w") as f:
|
||||
json.dump({"last_processed_file": self.last_processed_file}, f)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save checkpoint: {e}")
|
||||
|
||||
def _save_world(self):
|
||||
self.world_state["summary"]["last_update"] = datetime.now(timezone.utc).isoformat()
|
||||
active_incidents = [
|
||||
k for k, v in self.world_state["incidents"].items() if v.get("status") == "active"
|
||||
]
|
||||
self.world_state["summary"]["active_incidents_count"] = len(active_incidents)
|
||||
|
||||
if active_incidents:
|
||||
self.world_state["summary"]["status"] = "degraded"
|
||||
else:
|
||||
self.world_state["summary"]["status"] = "nominal"
|
||||
|
||||
files = {
|
||||
"nodes.json": self.world_state["nodes"],
|
||||
"services.json": self.world_state["services"],
|
||||
"deployments.json": self.world_state["deployments"],
|
||||
"incidents.json": self.world_state["incidents"],
|
||||
"runtime-summary.json": self.world_state["summary"]
|
||||
}
|
||||
for filename, data in files.items():
|
||||
try:
|
||||
with open(WORLD_DIR / filename, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save {filename}: {e}")
|
||||
|
||||
def process_event(self, event):
|
||||
etype = event.get("type")
|
||||
node = event.get("node")
|
||||
service = event.get("service")
|
||||
severity = event.get("severity")
|
||||
timestamp = event.get("timestamp")
|
||||
cid = event.get("correlation_id")
|
||||
payload = event.get("payload", {})
|
||||
|
||||
# 1. Update Node State
|
||||
if node not in self.world_state["nodes"]:
|
||||
self.world_state["nodes"][node] = {
|
||||
"status": "unknown",
|
||||
"last_seen": None,
|
||||
"roles": self.inventory["nodes"].get(node, {}).get("roles", [])
|
||||
}
|
||||
self.world_state["nodes"][node]["last_seen"] = timestamp
|
||||
|
||||
if etype == "node_online":
|
||||
self.world_state["nodes"][node]["status"] = "online"
|
||||
elif etype == "node_offline":
|
||||
self.world_state["nodes"][node]["status"] = "offline"
|
||||
|
||||
# 2. Update Service State
|
||||
if service and service != "all":
|
||||
svc_key = f"{node}/{service}"
|
||||
if svc_key not in self.world_state["services"]:
|
||||
self.world_state["services"][svc_key] = {
|
||||
"node": node,
|
||||
"service": service,
|
||||
"status": "unknown",
|
||||
"last_check": None,
|
||||
"incident_id": None
|
||||
}
|
||||
self.world_state["services"][svc_key]["last_check"] = timestamp
|
||||
|
||||
if etype == "service_recovered":
|
||||
self.world_state["services"][svc_key]["status"] = "healthy"
|
||||
self._resolve_incident(svc_key, timestamp)
|
||||
elif etype in ["service_unhealthy", "healthcheck_failed"]:
|
||||
self.world_state["services"][svc_key]["status"] = "unhealthy"
|
||||
self._handle_incident(svc_key, event)
|
||||
|
||||
# 3. Update Deployment State
|
||||
if etype.startswith("deployment_") and cid:
|
||||
if cid not in self.world_state["deployments"]:
|
||||
self.world_state["deployments"][cid] = {
|
||||
"node": node,
|
||||
"service": service,
|
||||
"status": "unknown",
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
"events": []
|
||||
}
|
||||
self.world_state["deployments"][cid]["events"].append({
|
||||
"type": etype,
|
||||
"timestamp": timestamp,
|
||||
"payload": payload
|
||||
})
|
||||
if etype == "deployment_started":
|
||||
self.world_state["deployments"][cid]["status"] = "in_progress"
|
||||
self.world_state["deployments"][cid]["started_at"] = timestamp
|
||||
elif etype == "deployment_completed":
|
||||
self.world_state["deployments"][cid]["status"] = "completed"
|
||||
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
||||
elif etype == "deployment_failed":
|
||||
self.world_state["deployments"][cid]["status"] = "failed"
|
||||
self.world_state["deployments"][cid]["finished_at"] = timestamp
|
||||
# Deployment failure often creates an incident
|
||||
self._handle_deployment_failure(event)
|
||||
|
||||
def _handle_incident(self, svc_key, event):
|
||||
# Correlation: collapse repeated failures for the same service on the same node
|
||||
active_incident = self.world_state["services"][svc_key].get("incident_id")
|
||||
|
||||
if active_incident and active_incident in self.world_state["incidents"]:
|
||||
incident = self.world_state["incidents"][active_incident]
|
||||
if incident["status"] == "active":
|
||||
incident["last_occurrence"] = event["timestamp"]
|
||||
incident["occurrence_count"] = incident.get("occurrence_count", 1) + 1
|
||||
incident["events"].append(event["timestamp"])
|
||||
return
|
||||
|
||||
# Create new incident
|
||||
incident_id = f"inc-{int(time.time())}-{event.get('node')}-{event.get('service')}"
|
||||
self.world_state["incidents"][incident_id] = {
|
||||
"id": incident_id,
|
||||
"node": event.get("node"),
|
||||
"service": event.get("service"),
|
||||
"status": "active",
|
||||
"severity": event.get("severity"),
|
||||
"started_at": event.get("timestamp"),
|
||||
"last_occurrence": event.get("timestamp"),
|
||||
"occurrence_count": 1,
|
||||
"events": [event["timestamp"]],
|
||||
"correlation_id": event.get("correlation_id")
|
||||
}
|
||||
self.world_state["services"][svc_key]["incident_id"] = incident_id
|
||||
|
||||
def _resolve_incident(self, svc_key, timestamp):
|
||||
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
||||
if incident_id and incident_id in self.world_state["incidents"]:
|
||||
if self.world_state["incidents"][incident_id]["status"] == "active":
|
||||
self.world_state["incidents"][incident_id]["status"] = "resolved"
|
||||
self.world_state["incidents"][incident_id]["resolved_at"] = timestamp
|
||||
self.world_state["services"][svc_key]["incident_id"] = None
|
||||
|
||||
def _handle_deployment_failure(self, event):
|
||||
# Specific logic for deployment failures
|
||||
svc_key = f"{event.get('node')}/{event.get('service')}"
|
||||
self._handle_incident(svc_key, event)
|
||||
|
||||
# Link diagnostics if available in payload
|
||||
incident_id = self.world_state["services"][svc_key].get("incident_id")
|
||||
if incident_id and incident_id in self.world_state["incidents"]:
|
||||
payload = event.get("payload", {})
|
||||
if "diagnostics_file" in payload:
|
||||
self.world_state["incidents"][incident_id]["diagnostics_ref"] = payload["diagnostics_file"]
|
||||
elif "error" in payload:
|
||||
self.world_state["incidents"][incident_id]["last_error"] = payload["error"]
|
||||
|
||||
def run_once(self):
|
||||
# Find all event files
|
||||
event_files = sorted(glob.glob(str(EVENTS_DIR / "*" / "*" / "*.json")))
|
||||
|
||||
new_files = []
|
||||
if self.last_processed_file:
|
||||
try:
|
||||
idx = event_files.index(self.last_processed_file)
|
||||
new_files = event_files[idx+1:]
|
||||
except ValueError:
|
||||
# If last_processed_file is gone or not in list, process all
|
||||
new_files = event_files
|
||||
else:
|
||||
new_files = event_files
|
||||
|
||||
if not new_files:
|
||||
return
|
||||
|
||||
logger.info(f"Processing {len(new_files)} new events")
|
||||
for file_path in new_files:
|
||||
try:
|
||||
with open(file_path, "r") as f:
|
||||
event = json.load(f)
|
||||
self.process_event(event)
|
||||
self.last_processed_file = file_path
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {file_path}: {e}")
|
||||
|
||||
self._save_checkpoint()
|
||||
self._save_world()
|
||||
|
||||
def loop(self, interval=5):
|
||||
logger.info("Starting observer loop")
|
||||
while True:
|
||||
self.run_once()
|
||||
time.sleep(interval)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
observer = Observer()
|
||||
if "--run-once" in sys.argv:
|
||||
observer.run_once()
|
||||
else:
|
||||
observer.loop()
|
||||
83
scripts/observer/test_setup.sh
Normal file
83
scripts/observer/test_setup.sh
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
#!/usr/bin/env bash
|
||||
mkdir -p /tmp/homelab/events/2026-05-12/saturn
|
||||
mkdir -p /tmp/homelab/state
|
||||
mkdir -p /tmp/homelab/logs
|
||||
mkdir -p /tmp/homelab/world
|
||||
|
||||
cat <<EOF > /tmp/homelab/events/2026-05-12/saturn/120000_node_online_1.json
|
||||
{
|
||||
"timestamp": "2026-05-12T12:00:00Z",
|
||||
"node": "saturn",
|
||||
"type": "node_online",
|
||||
"severity": "info",
|
||||
"source": "system",
|
||||
"service": "all",
|
||||
"correlation_id": "init",
|
||||
"payload": {}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat <<EOF > /tmp/homelab/events/2026-05-12/saturn/120500_service_unhealthy_1.json
|
||||
{
|
||||
"timestamp": "2026-05-12T12:05:00Z",
|
||||
"node": "saturn",
|
||||
"type": "service_unhealthy",
|
||||
"severity": "error",
|
||||
"source": "healthcheck",
|
||||
"service": "mosquitto",
|
||||
"correlation_id": "hc-1",
|
||||
"payload": {"error": "connection refused"}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat <<EOF > /tmp/homelab/events/2026-05-12/saturn/120600_service_unhealthy_2.json
|
||||
{
|
||||
"timestamp": "2026-05-12T12:06:00Z",
|
||||
"node": "saturn",
|
||||
"type": "service_unhealthy",
|
||||
"severity": "error",
|
||||
"source": "healthcheck",
|
||||
"service": "mosquitto",
|
||||
"correlation_id": "hc-2",
|
||||
"payload": {"error": "connection refused"}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat <<EOF > /tmp/homelab/events/2026-05-12/saturn/121000_service_recovered_1.json
|
||||
{
|
||||
"timestamp": "2026-05-12T12:10:00Z",
|
||||
"node": "saturn",
|
||||
"type": "service_recovered",
|
||||
"severity": "info",
|
||||
"source": "healthcheck",
|
||||
"service": "mosquitto",
|
||||
"correlation_id": "hc-3",
|
||||
"payload": {}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat <<EOF > /tmp/homelab/events/2026-05-12/saturn/121500_deployment_started_1.json
|
||||
{
|
||||
"timestamp": "2026-05-12T12:15:00Z",
|
||||
"node": "saturn",
|
||||
"type": "deployment_started",
|
||||
"severity": "info",
|
||||
"source": "deploy_agent",
|
||||
"service": "mosquitto",
|
||||
"correlation_id": "deploy-1",
|
||||
"payload": {"version": "2.0.18"}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat <<EOF > /tmp/homelab/events/2026-05-12/saturn/121600_deployment_failed_1.json
|
||||
{
|
||||
"timestamp": "2026-05-12T12:16:00Z",
|
||||
"node": "saturn",
|
||||
"type": "deployment_failed",
|
||||
"severity": "error",
|
||||
"source": "deploy_agent",
|
||||
"service": "mosquitto",
|
||||
"correlation_id": "deploy-1",
|
||||
"payload": {"error": "container crash", "diagnostics_file": "/opt/homelab/logs/diagnostics-deploy-1.log"}
|
||||
}
|
||||
EOF
|
||||
Loading…
Reference in a new issue