agent-system/scripts/supervisor/supervisor.py

293 lines
10 KiB
Python
Raw Normal View History

2026-05-12 17:34:42 +02:00
#!/usr/bin/env python3
import os
import sys
import yaml
import json
import time
import glob
from pathlib import Path
# Configuration
WORLD_STATE_PATH = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world"))
INVENTORY_PATH = Path("hosts")
EVENT_LOG = Path("/tmp/agent-events.log")
CHECKPOINT_FILE = Path("/tmp/supervisor-checkpoint.json")
# Reconcile event types
RECONCILE_REQUIRED = "reconcile_required"
RECONCILE_RECOMMENDED = "reconcile_recommended"
RECONCILE_BLOCKED = "reconcile_blocked"
# Runtime summary states
STATE_NOMINAL = "nominal"
STATE_DEGRADED = "degraded"
STATE_UNSTABLE = "unstable"
STATE_RECONCILING = "reconciling"
def emit_event(event_type, message, details=None):
"""Emit reconciliation events using existing event system (append-only file)."""
event = {
"type": event_type,
"message": message,
"timestamp": time.time(),
"details": details or {}
}
line = json.dumps(event)
print(line)
try:
# Append-only semantics
with open(EVENT_LOG, "a", encoding="utf-8") as f:
f.write(line + "\n")
f.flush()
except Exception as e:
print(f"Error writing to event log: {e}", file=sys.stderr)
def load_desired_state():
"""Load desired state from hosts/*/services.yaml."""
desired = {"services": [], "nodes": []}
if not INVENTORY_PATH.exists():
return desired
# Inventory model: hosts/{node_name}/services.yaml
for yaml_file in glob.glob(str(INVENTORY_PATH / "*" / "services.yaml")):
try:
with open(yaml_file, "r") as f:
data = yaml.safe_load(f)
if data and "services" in data:
node_name = Path(yaml_file).parent.name
for svc in data["services"]:
if isinstance(svc, str):
svc = {"name": svc}
svc["node"] = node_name
desired["services"].append(svc)
if node_name not in desired["nodes"]:
desired["nodes"].append(node_name)
except Exception as e:
print(f"Error loading {yaml_file}: {e}", file=sys.stderr)
return desired
def load_world_state():
"""Load current world state from /opt/homelab/world/ (filesystem-first)."""
world = {
"services": {},
"nodes": {},
"deployments": {},
"incidents": {}
}
if not WORLD_STATE_PATH.exists():
return world
# Filesystem-first design: each category is a directory, each item is a JSON file
for category in ["services", "nodes", "deployments", "incidents"]:
cat_path = WORLD_STATE_PATH / category
if cat_path.exists() and cat_path.is_dir():
for json_file in cat_path.glob("*.json"):
try:
with open(json_file, "r") as f:
world[category][json_file.stem] = json.load(f)
except Exception as e:
print(f"Error loading {json_file}: {e}", file=sys.stderr)
return world
def detect_drift(desired, world):
"""Compare desired infrastructure state with observed runtime state."""
drifts = []
# 1. Missing service & 2. Unhealthy service
desired_service_names = set()
for d_svc in desired["services"]:
name = d_svc["name"]
desired_service_names.add(name)
a_svc = world["services"].get(name)
if not a_svc:
drifts.append({
"type": "missing_service",
"service": name,
"node": d_svc["node"]
})
elif a_svc.get("status") not in ("ok", "healthy", "up"):
drifts.append({
"type": "unhealthy_service",
"service": name,
"status": a_svc.get("status"),
"node": a_svc.get("node") or d_svc["node"]
})
# 4. Offline node
for node_name in desired["nodes"]:
a_node = world["nodes"].get(node_name)
if not a_node or a_node.get("status") not in ("online", "ok", "up"):
drifts.append({
"type": "offline_node",
"node": node_name,
"status": a_node.get("status") if a_node else "missing"
})
# 3. Failed deployment
# Check for recent failures in world/deployments
for d_id, d_data in world["deployments"].items():
if d_data.get("status") == "failed":
drifts.append({
"type": "failed_deployment",
"deployment_id": d_id,
"service": d_data.get("service")
})
# 5. Unresolved incidents
for i_id, i_data in world["incidents"].items():
if i_data.get("status") not in ("resolved", "closed"):
drifts.append({
"type": "unresolved_incident",
"incident_id": i_id,
"description": i_data.get("description"),
"status": i_data.get("status")
})
return drifts
def recommendation_engine(drifts, world):
"""Recommendation mode only: emit proposed actions without mutation."""
recommendations = []
for drift in drifts:
if drift["type"] == "unhealthy_service":
recommendations.append({
"drift": drift,
"action": "redeploy",
"message": f"Service {drift['service']} is unhealthy. Recommend redeploy.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "failed_deployment":
service = drift["service"]
# Recommendation: repeated deployment failures -> recommend diagnostics
failures = [d for d in world["deployments"].values() if d.get("service") == service and d.get("status") == "failed"]
if len(failures) > 2:
recommendations.append({
"drift": drift,
"action": "diagnostics",
"message": f"Repeated deployment failures for {service}. Recommend diagnostics.",
"type": RECONCILE_BLOCKED
})
else:
recommendations.append({
"drift": drift,
"action": "redeploy",
"message": f"Deployment failed for {service}. Recommend retry.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "offline_node":
# Recommendation: node offline -> recommend failover review
recommendations.append({
"drift": drift,
"action": "failover_review",
"message": f"Node {drift['node']} is offline. Recommend failover review.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "missing_service":
# Recommendation: dependency unavailable -> recommend delayed deployment
# Mock dependency check: if a service is 'webapp' and 'database' is not healthy
dependencies_met = True
if drift["service"] == "webapp":
db_svc = world["services"].get("database")
if not db_svc or db_svc.get("status") not in ("ok", "healthy", "up"):
dependencies_met = False
if not dependencies_met:
recommendations.append({
"drift": drift,
"action": "delayed_deployment",
"message": f"Dependency unavailable for {drift['service']}. Recommend delayed deployment.",
"type": RECONCILE_RECOMMENDED
})
else:
recommendations.append({
"drift": drift,
"action": "deploy",
"message": f"Service {drift['service']} is missing. Recommend deployment.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "unresolved_incident":
recommendations.append({
"drift": drift,
"action": "review",
"message": f"Unresolved incident: {drift['description']}. Recommend review.",
"type": RECONCILE_RECOMMENDED
})
return recommendations
def save_checkpoint(state):
"""Checkpoint support for idempotent operation."""
try:
with open(CHECKPOINT_FILE, "w") as f:
json.dump({
"last_run": time.time(),
"state": state
}, f)
except Exception as e:
print(f"Error saving checkpoint: {e}", file=sys.stderr)
def load_checkpoint():
"""Load last run checkpoint."""
if CHECKPOINT_FILE.exists():
try:
with open(CHECKPOINT_FILE, "r") as f:
return json.load(f)
except Exception as e:
print(f"Error loading checkpoint: {e}", file=sys.stderr)
return None
def main():
print(f"--- Supervisor Run: {time.ctime()} ---")
checkpoint = load_checkpoint()
if checkpoint:
print(f"Last run: {time.ctime(checkpoint.get('last_run', 0))}")
# 1. Load desired state
desired = load_desired_state()
# 2. Load world state
world = load_world_state()
# 3. Detect drift
drifts = detect_drift(desired, world)
# 4. Generate recommendations (Recommendation mode only)
recommendations = recommendation_engine(drifts, world)
# 5. Emit events & Update summary state
if not recommendations and not drifts:
emit_event("summary_state", f"System state: {STATE_NOMINAL}", {"state": STATE_NOMINAL})
else:
# Extend runtime summary states: nominal, degraded, unstable, reconciling
has_blocked = any(r["type"] == RECONCILE_BLOCKED for r in recommendations)
has_required = any(r["type"] == RECONCILE_REQUIRED for r in recommendations)
overall_state = STATE_NOMINAL
if has_blocked:
overall_state = STATE_UNSTABLE
elif has_required:
overall_state = STATE_DEGRADED
elif recommendations:
overall_state = STATE_DEGRADED
emit_event("summary_state", f"System state: {overall_state}", {"state": overall_state})
# Emit reconciliation events
for rec in recommendations:
emit_event(rec["type"], rec["message"], rec["drift"])
# 6. Save checkpoint
save_checkpoint({
"drift_count": len(drifts),
"recommendation_count": len(recommendations),
"timestamp": time.time()
})
print("Run complete.")
if __name__ == "__main__":
main()