""" planner.py — planner-agent main loop. Listens to Redis pub/sub channels: - health_events: node-agent / stability-agent health notifications - world_updates: observer world-state change notifications For each event that clears the cooldown gate: 1. Ask LLMRouter to diagnose and produce a structured action proposal. 2. Write proposal to /opt/homelab/actions/pending/.json. 3. Emit a remediation_started filesystem event. Human-in-the-loop invariant ---------------------------- The planner ONLY writes to actions/pending/. Execution requires an operator-approved action file in actions/approved/ — the planner never touches that directory. """ import asyncio import json import os import signal import sys import time from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import redis.asyncio as aioredis import structlog # Allow running from src/ directory without installation sys.path.insert(0, str(Path(__file__).parent)) from llm_router import LLMRouter, RouteResult # noqa: E402 # --------------------------------------------------------------------------- # Structured logging — JSON to stdout # --------------------------------------------------------------------------- structlog.configure( processors=[ structlog.stdlib.add_log_level, # adds "level" key structlog.processors.TimeStamper(fmt="iso", utc=True), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer(), ], wrapper_class=structlog.make_filtering_bound_logger(20), # INFO+ logger_factory=structlog.PrintLoggerFactory(), # add_logger_name is intentionally excluded: it requires a stdlib # logger with a .name attribute; PrintLogger does not have one. ) log = structlog.get_logger("planner") # --------------------------------------------------------------------------- # Runtime paths # --------------------------------------------------------------------------- RUNTIME_PATH = Path(os.getenv("RUNTIME_PATH", "/opt/homelab")) ACTIONS_DIR = RUNTIME_PATH / "actions" EVENTS_DIR = RUNTIME_PATH / "events" STATE_DIR = RUNTIME_PATH / "state" HEARTBEAT = STATE_DIR / "planner-agent.heartbeat" # --------------------------------------------------------------------------- # Configuration (from env) # --------------------------------------------------------------------------- REDIS_URL = os.getenv("REDIS_URL", "redis://100.108.208.3:6379") OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://100.108.208.3:11434") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:7b") NODE_NAME = os.getenv("NODE_NAME", "solaria") COOLDOWN_SECONDS = int(os.getenv("COOLDOWN_SECONDS", "300")) # 5 min SUBSCRIBE_CHANNELS = ["health_events", "world_updates"] # --------------------------------------------------------------------------- # JSON Schema — validated by LLMRouter (jsonschema) before accepting response # --------------------------------------------------------------------------- PROPOSAL_SCHEMA: dict = { "type": "object", "required": ["action", "service", "node", "reason", "confidence", "requires_human"], "additionalProperties": False, "properties": { "action": { "type": "string", "enum": ["restart", "redeploy", "notify", "ignore"], }, "service": {"type": "string"}, "node": {"type": "string"}, "reason": {"type": "string", "minLength": 10}, "confidence": {"type": "number", "minimum": 0.0, "maximum": 1.0}, "requires_human": {"type": "boolean"}, }, } # --------------------------------------------------------------------------- # LLM system prompt # --------------------------------------------------------------------------- SYSTEM_PROMPT = """You are the planner agent for a distributed homelab orchestration system. Your job is to diagnose infrastructure health events and propose a remediation action. Homelab topology: vps — Hetzner VPS; public ingress, control plane piha — Raspberry Pi 5; infra, monitoring, Redis, Ollama solaria — GPU workstation; AI / compute workloads chelsty-infra — LTE edge; Zigbee2MQTT, Mosquitto — offline-first chelsty-ha — LTE edge; Home Assistant — offline-first Action selection rules: restart — container exists but is stopped/unhealthy; docker restart suffices (low risk) redeploy — container is broken beyond a simple restart; full docker compose up (guarded) notify — human decision required; do not attempt automated fix ignore — transient / one-off glitch; monitoring will catch a repeat Risk rules (enforce strictly): - For any chelsty-* node: always set requires_human: true - For disk_pressure events: always use "notify" - If confidence < 0.7: set requires_human: true - Unknown/novel failure patterns: prefer "notify" over guessing Respond with ONLY a single JSON object, no markdown, no commentary: { "action": "restart|redeploy|notify|ignore", "service": "", "node": "", "reason": "", "confidence": <0.0–1.0>, "requires_human": }""" # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- @dataclass class HealthEvent: """Normalized health event ingested from a Redis channel message.""" node: str service: str event_type: str # e.g. "service_unhealthy", "disk_pressure_high" severity: str # "info" | "warning" | "error" | "critical" payload: dict = field(default_factory=dict) timestamp: float = field(default_factory=time.time) raw_channel: str = "" @property def svc_key(self) -> str: return f"{self.node}/{self.service}" def __str__(self) -> str: return f"[{self.event_type}] {self.svc_key} ({self.severity})" @dataclass class ActionProposal: """Planner's structured output, written to actions/pending/.json.""" action_id: str type: str # executor type: "container_restart"|"redeploy"|"notify"|"ignore" action: str # LLM's action: "restart"|"redeploy"|"notify"|"ignore" service: str node: str reason: str confidence: float requires_human: bool risk_level: str status: str = "pending" timestamp: float = field(default_factory=time.time) source_event: str = "" description: str = "" llm_model: str = "" llm_attempts: int = 0 def to_action_file(self) -> dict: """Return a dict compatible with the executor's action file format.""" return { "action_id": self.action_id, "type": self.type, "node": self.node, "service": self.service, "risk_level": self.risk_level, "confidence": self.confidence, "requires_human": self.requires_human, "description": self.description or self.reason, "status": self.status, "timestamp": self.timestamp, "source_event": self.source_event, "llm_model": self.llm_model, "llm_attempts": self.llm_attempts, "payload": { "action": self.action, "reason": self.reason, }, } # --------------------------------------------------------------------------- # Cooldown tracker # --------------------------------------------------------------------------- class CooldownTracker: """Gate: suppress duplicate proposals for the same service/node pair. A proposal is suppressed if a previous proposal for the same svc_key was emitted within the last ``cooldown_seconds`` seconds. """ def __init__(self, cooldown_seconds: float = COOLDOWN_SECONDS) -> None: self._cooldown = cooldown_seconds self._last: dict[str, float] = {} def is_ready(self, svc_key: str) -> bool: """True when enough time has elapsed since the last proposal.""" return (time.time() - self._last.get(svc_key, 0.0)) >= self._cooldown def record(self, svc_key: str) -> None: """Mark a proposal as just emitted for svc_key.""" self._last[svc_key] = time.time() def remaining_seconds(self, svc_key: str) -> float: return max(0.0, self._cooldown - (time.time() - self._last.get(svc_key, 0.0))) def reset(self, svc_key: str) -> None: """Force-reset cooldown (e.g. for testing or manual override).""" self._last.pop(svc_key, None) # --------------------------------------------------------------------------- # Event emission (filesystem — no control-plane import) # --------------------------------------------------------------------------- def _emit_event_sync( event_type: str, severity: str, service: str, correlation_id: str, payload: Optional[dict] = None, node: Optional[str] = None, ) -> None: # Read NODE_NAME at call time (not import time) so monkeypatching works in tests. if node is None: node = NODE_NAME """Write a normalized JSON event file to the filesystem event store. Mirrors scripts/lib/events.py behaviour — keeping planner fully independent of the control-plane package. """ now = datetime.now(timezone.utc) timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ") date_dir = now.strftime("%Y-%m-%d") svc_slug = (service or "planner").replace("/", "-").replace(" ", "-") fname = f"evt-{node}-{int(time.time())}-{event_type}-{svc_slug}.json" event_dir = EVENTS_DIR / date_dir / node try: event_dir.mkdir(parents=True, exist_ok=True) (event_dir / fname).write_text(json.dumps({ "timestamp": timestamp, "node": node, "type": event_type, "severity": severity, "source": "planner-agent", "service": service, "correlation_id": correlation_id, "payload": payload or {}, }, indent=2)) except Exception as exc: log.warning("event_write_failed", path=str(event_dir / fname), error=str(exc)) async def emit_event( event_type: str, severity: str, service: str, correlation_id: str, payload: Optional[dict] = None, ) -> None: """Async wrapper around _emit_event_sync (runs in thread pool).""" await asyncio.to_thread( _emit_event_sync, event_type, severity, service, correlation_id, payload ) # --------------------------------------------------------------------------- # Action file I/O # --------------------------------------------------------------------------- async def write_pending_action(proposal: ActionProposal) -> Path: """Atomically write proposal JSON to actions/pending/.json.""" pending_dir = ACTIONS_DIR / "pending" pending_dir.mkdir(parents=True, exist_ok=True) path = pending_dir / f"{proposal.action_id}.json" def _write() -> None: # Write to tmp then rename so readers never see a partial file tmp = path.with_suffix(".tmp") tmp.write_text(json.dumps(proposal.to_action_file(), indent=2)) tmp.replace(path) await asyncio.to_thread(_write) return path # --------------------------------------------------------------------------- # LLM prompt helpers # --------------------------------------------------------------------------- def build_messages(event: HealthEvent) -> list[dict]: """Construct the OpenAI-style message list for one health event.""" user_content = ( f"Health event received:\n" f" node: {event.node}\n" f" service: {event.service}\n" f" type: {event.event_type}\n" f" severity: {event.severity}\n" f" timestamp: {datetime.fromtimestamp(event.timestamp, tz=timezone.utc).isoformat()}\n" ) if event.payload: payload_str = json.dumps(event.payload, indent=4) user_content += f" payload:\n{payload_str}\n" user_content += ( "\nRespond with ONLY the JSON object as specified." ) return [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ] def map_action_to_executor_type(action: str) -> tuple[str, str]: """Map LLM action name → (executor type, risk_level).""" return { "restart": ("container_restart", "low"), "redeploy": ("redeploy", "guarded"), "notify": ("notify", "low"), "ignore": ("ignore", "none"), }.get(action, ("notify", "low")) # --------------------------------------------------------------------------- # Event parsing # --------------------------------------------------------------------------- def parse_event(raw: dict, channel: str) -> Optional[HealthEvent]: """Normalise a raw Redis pub/sub payload into a HealthEvent. Accepts two common shapes: Shape A — node-agent / stability-agent filesystem event format: {"type": "service_unhealthy", "node": "piha", "service": "mosquitto", "severity": "error", "payload": {...}} Shape B — control-plane world_updates format: {"event_type": "...", "node": "...", "service": "...", ...} """ event_type = raw.get("type") or raw.get("event_type", "") node = (raw.get("node") or "").strip() service = (raw.get("service") or "").strip() severity = (raw.get("severity") or "info").strip() if not event_type or not node: return None # For node-level events (e.g. node_offline) without a service field if not service: details = raw.get("details") or raw.get("payload") or {} service = details.get("service", "") if isinstance(details, dict) else "" if not service: service = node # fallback: use node name as service key # Parse timestamp ts_raw = raw.get("timestamp", time.time()) if isinstance(ts_raw, str): try: ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp() except (ValueError, AttributeError): ts = time.time() else: try: ts = float(ts_raw) except (TypeError, ValueError): ts = time.time() payload = raw.get("payload") or raw.get("details") or {} if not isinstance(payload, dict): payload = {} return HealthEvent( node = node, service = service, event_type = event_type, severity = severity, payload = payload, timestamp = ts, raw_channel = channel, ) # --------------------------------------------------------------------------- # Planner agent # --------------------------------------------------------------------------- # Event types that require no action (healthy signals, completions) _BENIGN_EVENTS = frozenset({ "service_healthy", "service_recovered", "node_online", "deployment_completed", "deployment_started", "remediation_started", "remediation_completed", }) class PlannerAgent: """Async agent: subscribe → receive → diagnose → propose action. Designed for testability: all I/O (Redis, filesystem, LLM) is injected or mockable. The ``router`` parameter accepts a pre-built LLMRouter so tests can substitute it without network calls. """ def __init__( self, redis_url: str = REDIS_URL, ollama_host: str = OLLAMA_HOST, ollama_model: str = OLLAMA_MODEL, router: Optional[LLMRouter] = None, cooldown: Optional[CooldownTracker] = None, ) -> None: self._redis_url = redis_url self._redis: Optional[aioredis.Redis] = None self._pubsub: Optional[aioredis.client.PubSub] = None self._running = False self.router = router or LLMRouter( redis_url = redis_url, ollama_host = ollama_host, ollama_model = ollama_model, ) self.cooldown = cooldown or CooldownTracker() # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def start(self) -> None: self._redis = aioredis.from_url( self._redis_url, decode_responses=True, socket_connect_timeout=5, socket_timeout=10, ) self._pubsub = self._redis.pubsub(ignore_subscribe_messages=True) await self._pubsub.subscribe(*SUBSCRIBE_CHANNELS) log.info("planner_started", channels=SUBSCRIBE_CHANNELS, node=NODE_NAME) async def stop(self) -> None: self._running = False if self._pubsub: try: await self._pubsub.unsubscribe() await self._pubsub.aclose() except Exception: pass if self._redis: try: await self._redis.aclose() except Exception: pass try: await self.router.close() except Exception: pass log.info("planner_stopped") async def run(self) -> None: """Main event loop. Runs until cancelled or SIGINT/SIGTERM.""" await self.start() self._running = True _ensure_dirs() try: while self._running: self._touch_heartbeat() try: msg = await asyncio.wait_for( self._pubsub.get_message(ignore_subscribe_messages=True), timeout=5.0, ) except asyncio.TimeoutError: continue if msg is None: await asyncio.sleep(0.05) continue await self._dispatch(msg) except asyncio.CancelledError: log.info("planner_cancelled") except Exception as exc: log.exception("planner_fatal_error", error=str(exc)) raise finally: await self.stop() # ------------------------------------------------------------------ # Message dispatch # ------------------------------------------------------------------ async def _dispatch(self, msg: dict) -> None: """Deserialise one Redis pub/sub message and hand off to _handle_event.""" channel = msg.get("channel", "") data = msg.get("data", "") if not isinstance(data, str): return try: raw = json.loads(data) except json.JSONDecodeError: log.warning("malformed_message", channel=channel, preview=data[:120]) return if not isinstance(raw, dict): return event = parse_event(raw, channel) if event is None: log.debug("unparseable_event", channel=channel, keys=list(raw.keys())) return log.info( "event_received", channel = channel, svc_key = event.svc_key, type = event.event_type, severity = event.severity, ) await self._handle_event(event) # ------------------------------------------------------------------ # Core pipeline # ------------------------------------------------------------------ async def _handle_event(self, event: HealthEvent) -> None: """Cooldown → LLM proposal → write pending action → emit event.""" # Benign events need no remediation if event.event_type in _BENIGN_EVENTS: log.debug("benign_event_skipped", type=event.event_type, svc_key=event.svc_key) return svc_key = event.svc_key if not self.cooldown.is_ready(svc_key): log.info( "cooldown_active", svc_key = svc_key, remaining_seconds = round(self.cooldown.remaining_seconds(svc_key)), ) return proposal = await self._propose_action(event) if proposal is None: # LLM fully failed — do not record cooldown so next event can retry return self.cooldown.record(svc_key) if proposal.action == "ignore": log.info( "proposal_ignored", svc_key = svc_key, reason = proposal.reason, confidence = proposal.confidence, llm_model = proposal.llm_model, ) return # Write to pending (human must approve before executor runs it) try: path = await write_pending_action(proposal) except Exception as exc: log.error("action_write_failed", svc_key=svc_key, error=str(exc)) return log.info( "action_proposed", action_id = proposal.action_id, action = proposal.action, executor_type = proposal.type, svc_key = svc_key, requires_human = proposal.requires_human, confidence = proposal.confidence, risk_level = proposal.risk_level, llm_model = proposal.llm_model, path = str(path), ) await emit_event( event_type = "remediation_started", severity = "info", service = event.service, correlation_id = proposal.action_id, payload = { "action": proposal.action, "executor_type": proposal.type, "node": event.node, "action_id": proposal.action_id, "requires_human": proposal.requires_human, "confidence": proposal.confidence, "llm_model": proposal.llm_model, }, ) # ------------------------------------------------------------------ # LLM call # ------------------------------------------------------------------ async def _propose_action(self, event: HealthEvent) -> Optional[ActionProposal]: """Invoke LLMRouter and map the validated response to an ActionProposal.""" messages = build_messages(event) action_id = ( f"plan-{event.node}-{event.service.replace('/', '-')}" f"-{int(event.timestamp)}" ) try: result: RouteResult = await self.router.route( messages = messages, schema = PROPOSAL_SCHEMA, context = f"planner.{event.svc_key}", ) except RuntimeError as exc: log.error( "llm_chain_exhausted", svc_key = event.svc_key, error = str(exc)[:400], ) return None raw = result.content # already parsed + schema-validated by LLMRouter action = raw["action"] ex_type, risk = map_action_to_executor_type(action) return ActionProposal( action_id = action_id, type = ex_type, action = action, service = raw.get("service") or event.service, node = raw.get("node") or event.node, reason = raw["reason"], confidence = float(raw["confidence"]), requires_human = bool(raw["requires_human"]), risk_level = risk, timestamp = event.timestamp, source_event = event.event_type, description = ( f"{action.upper()} {raw.get('service', event.service)} " f"on {raw.get('node', event.node)}: {raw['reason']}" ), llm_model = result.model_used, llm_attempts = len(result.attempts), ) # ------------------------------------------------------------------ # Utilities # ------------------------------------------------------------------ def _touch_heartbeat(self) -> None: try: STATE_DIR.mkdir(parents=True, exist_ok=True) HEARTBEAT.touch() except Exception as exc: log.warning("heartbeat_failed", error=str(exc)) # --------------------------------------------------------------------------- # Module helpers # --------------------------------------------------------------------------- def _ensure_dirs() -> None: for sub in ("pending", "approved", "running", "completed", "failed", "rejected", "cancelled"): (ACTIONS_DIR / sub).mkdir(parents=True, exist_ok=True) STATE_DIR.mkdir(parents=True, exist_ok=True) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- async def _main() -> None: agent = PlannerAgent() loop = asyncio.get_running_loop() def _shutdown(sig_name: str) -> None: log.info("shutdown_signal", signal=sig_name) agent._running = False for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, _shutdown, sig.name) await agent.run() if __name__ == "__main__": asyncio.run(_main())