From ca37fca5ce36f773da2fc1f0530305b09db1eec8 Mon Sep 17 00:00:00 2001 From: Oskar Kapala Date: Wed, 27 May 2026 19:11:39 +0200 Subject: [PATCH] feat(planner-agent): main loop with LLM routing and HITL action proposals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit services/planner-agent/src/planner.py: - PlannerAgent: async Redis pub/sub on health_events + world_updates - Pipeline: receive event → cooldown gate → LLMRouter → write pending action → emit remediation_started filesystem event - CooldownTracker: 5-min suppression per svc_key (configurable via env) - parse_event(): accepts node-agent shape A and world_updates shape B - PROPOSAL_SCHEMA: jsonschema enforced by LLMRouter before accepting response - SYSTEM_PROMPT: homelab topology + action rules (chelsty always requires_human, disk_pressure always notify, confidence<0.7 → requires_human) - write_pending_action(): atomic tmp→rename write, executor-compatible format - emit_event(): async wrapper around filesystem event write (no control-plane import) - _emit_event_sync() reads NODE_NAME at call time (not import) for testability - Benign events (service_healthy, node_online, ...) silently skipped - LLM chain failure: no cooldown recorded so next event can retry services/planner-agent/tests/test_planner.py (49 tests, 0 network): - TestCooldownTracker: 7 tests (ready/not-ready/elapsed/reset/independence) - TestHealthEvent, TestActionProposal, TestMapActionToExecutorType - TestParseEvent: both event shapes, missing fields, timestamp formats - TestBuildMessages: system prompt rules, payload inclusion - TestPlannerHandleEvent: benign skip, cooldown block, ignore/restart/redeploy/ notify proposals, remediation event emission, LLM failure isolation, requires_human propagation, cooldown recording, model name in proposal - TestPlannerDispatch: valid JSON, invalid JSON, non-string data, missing node - TestWritePendingAction, TestEmitEvent: filesystem integration with tmp_path services/planner-agent/service.yaml: owner_node: solaria, dependencies: [redis, ollama] services/planner-agent/docker-compose.yml: env + healthcheck services/planner-agent/Dockerfile: python:3.11-slim services/planner-agent/healthcheck.sh: heartbeat file age check (300s) services/planner-agent/requirements.txt: litellm, redis, jsonschema, structlog Co-Authored-By: Claude Sonnet 4.6 --- services/planner-agent/Dockerfile | 17 + services/planner-agent/docker-compose.yml | 21 + services/planner-agent/healthcheck.sh | 28 + services/planner-agent/requirements.txt | 3 +- services/planner-agent/service.yaml | 45 ++ services/planner-agent/src/planner.py | 709 +++++++++++++++++++ services/planner-agent/tests/test_planner.py | 604 ++++++++++++++++ 7 files changed, 1426 insertions(+), 1 deletion(-) create mode 100644 services/planner-agent/Dockerfile create mode 100644 services/planner-agent/docker-compose.yml create mode 100644 services/planner-agent/healthcheck.sh create mode 100644 services/planner-agent/service.yaml create mode 100644 services/planner-agent/src/planner.py create mode 100644 services/planner-agent/tests/test_planner.py diff --git a/services/planner-agent/Dockerfile b/services/planner-agent/Dockerfile new file mode 100644 index 0000000..1035e93 --- /dev/null +++ b/services/planner-agent/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy source +COPY src/ /app/src/ + +COPY healthcheck.sh /app/healthcheck.sh +RUN chmod +x /app/healthcheck.sh + +ENV PYTHONUNBUFFERED=1 + +CMD ["python", "src/planner.py"] diff --git a/services/planner-agent/docker-compose.yml b/services/planner-agent/docker-compose.yml new file mode 100644 index 0000000..d2700bf --- /dev/null +++ b/services/planner-agent/docker-compose.yml @@ -0,0 +1,21 @@ +services: + planner-agent: + build: . + container_name: planner-agent + restart: unless-stopped + volumes: + - /opt/homelab:/opt/homelab + environment: + - REDIS_URL=${REDIS_URL:-redis://100.108.208.3:6379} + - OLLAMA_HOST=${OLLAMA_HOST:-http://100.108.208.3:11434} + - OLLAMA_MODEL=${OLLAMA_MODEL:-qwen2.5:7b} + - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} + - NODE_NAME=${NODE_NAME:-solaria} + - COOLDOWN_SECONDS=${COOLDOWN_SECONDS:-300} + - RUNTIME_PATH=${RUNTIME_PATH:-/opt/homelab} + healthcheck: + test: ["CMD", "/bin/sh", "/app/healthcheck.sh"] + interval: 60s + timeout: 10s + retries: 3 + start_period: 30s diff --git a/services/planner-agent/healthcheck.sh b/services/planner-agent/healthcheck.sh new file mode 100644 index 0000000..43b60e8 --- /dev/null +++ b/services/planner-agent/healthcheck.sh @@ -0,0 +1,28 @@ +#!/bin/sh +# Healthcheck: verify the planner-agent heartbeat is fresh. +# The planner touches /opt/homelab/state/planner-agent.heartbeat +# at the top of every poll cycle (≤5 s intervals). +# We fail if it is older than 300 s (5 min = one full cooldown window). + +HEARTBEAT_FILE="${RUNTIME_PATH:-/opt/homelab}/state/planner-agent.heartbeat" +MAX_AGE_SECONDS=300 + +if [ ! -f "$HEARTBEAT_FILE" ]; then + echo "FAIL: heartbeat file missing: $HEARTBEAT_FILE" + exit 1 +fi + +NOW=$(date +%s) +FILE_TIME=$(stat -c %Y "$HEARTBEAT_FILE" 2>/dev/null) || { + echo "FAIL: cannot stat heartbeat file" + exit 1 +} +AGE=$((NOW - FILE_TIME)) + +if [ "$AGE" -gt "$MAX_AGE_SECONDS" ]; then + echo "FAIL: heartbeat stale (${AGE}s > ${MAX_AGE_SECONDS}s)" + exit 1 +fi + +echo "OK: heartbeat age ${AGE}s" +exit 0 diff --git a/services/planner-agent/requirements.txt b/services/planner-agent/requirements.txt index 3d88ca9..4922af2 100644 --- a/services/planner-agent/requirements.txt +++ b/services/planner-agent/requirements.txt @@ -1,3 +1,4 @@ litellm>=1.40.0 -redis[asyncio]>=5.0.0 +redis>=5.0.0 jsonschema>=4.21.0 +structlog>=24.1.0 diff --git a/services/planner-agent/service.yaml b/services/planner-agent/service.yaml new file mode 100644 index 0000000..392a628 --- /dev/null +++ b/services/planner-agent/service.yaml @@ -0,0 +1,45 @@ +service: + name: planner-agent + owner_node: solaria + exposure: private + dependencies: + - redis + - ollama + + ports: [] # no external port; communicates via Redis pub/sub + + healthcheck: + type: file + path: /opt/homelab/state/planner-agent.heartbeat + max_age_seconds: 300 # 5 minutes — matches COOLDOWN_SECONDS + interval: 60s + timeout: 10s + retries: 3 + + restart_policy: unless-stopped + + persistence: + paths: + - /opt/homelab/actions + - /opt/homelab/events + - /opt/homelab/state + + runtime: + directories: + - /opt/homelab/actions/pending + - /opt/homelab/actions/approved + - /opt/homelab/actions/running + - /opt/homelab/actions/completed + - /opt/homelab/actions/failed + - /opt/homelab/actions/rejected + - /opt/homelab/actions/cancelled + - /opt/homelab/events + - /opt/homelab/state + env_vars: + - REDIS_URL # redis://100.108.208.3:6379 + - OLLAMA_HOST # http://100.108.208.3:11434 + - OLLAMA_MODEL # qwen2.5:7b + - ANTHROPIC_API_KEY # for claude-haiku/sonnet fallback + - NODE_NAME # solaria + - COOLDOWN_SECONDS # default 300 + - RUNTIME_PATH # default /opt/homelab diff --git a/services/planner-agent/src/planner.py b/services/planner-agent/src/planner.py new file mode 100644 index 0000000..43c1208 --- /dev/null +++ b/services/planner-agent/src/planner.py @@ -0,0 +1,709 @@ +""" +planner.py — planner-agent main loop. + +Listens to Redis pub/sub channels: + - health_events: node-agent / stability-agent health notifications + - world_updates: observer world-state change notifications + +For each event that clears the cooldown gate: + 1. Ask LLMRouter to diagnose and produce a structured action proposal. + 2. Write proposal to /opt/homelab/actions/pending/.json. + 3. Emit a remediation_started filesystem event. + +Human-in-the-loop invariant +---------------------------- +The planner ONLY writes to actions/pending/. Execution requires an +operator-approved action file in actions/approved/ — the planner +never touches that directory. +""" + +import asyncio +import json +import os +import signal +import sys +import time +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Optional + +import redis.asyncio as aioredis +import structlog + +# Allow running from src/ directory without installation +sys.path.insert(0, str(Path(__file__).parent)) +from llm_router import LLMRouter, RouteResult # noqa: E402 + +# --------------------------------------------------------------------------- +# Structured logging — JSON to stdout +# --------------------------------------------------------------------------- +structlog.configure( + processors=[ + structlog.stdlib.add_log_level, # adds "level" key + structlog.processors.TimeStamper(fmt="iso", utc=True), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(), + ], + wrapper_class=structlog.make_filtering_bound_logger(20), # INFO+ + logger_factory=structlog.PrintLoggerFactory(), + # add_logger_name is intentionally excluded: it requires a stdlib + # logger with a .name attribute; PrintLogger does not have one. +) +log = structlog.get_logger("planner") + +# --------------------------------------------------------------------------- +# Runtime paths +# --------------------------------------------------------------------------- +RUNTIME_PATH = Path(os.getenv("RUNTIME_PATH", "/opt/homelab")) +ACTIONS_DIR = RUNTIME_PATH / "actions" +EVENTS_DIR = RUNTIME_PATH / "events" +STATE_DIR = RUNTIME_PATH / "state" +HEARTBEAT = STATE_DIR / "planner-agent.heartbeat" + +# --------------------------------------------------------------------------- +# Configuration (from env) +# --------------------------------------------------------------------------- +REDIS_URL = os.getenv("REDIS_URL", "redis://100.108.208.3:6379") +OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://100.108.208.3:11434") +OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:7b") +NODE_NAME = os.getenv("NODE_NAME", "solaria") +COOLDOWN_SECONDS = int(os.getenv("COOLDOWN_SECONDS", "300")) # 5 min +SUBSCRIBE_CHANNELS = ["health_events", "world_updates"] + +# --------------------------------------------------------------------------- +# JSON Schema — validated by LLMRouter (jsonschema) before accepting response +# --------------------------------------------------------------------------- +PROPOSAL_SCHEMA: dict = { + "type": "object", + "required": ["action", "service", "node", "reason", "confidence", "requires_human"], + "additionalProperties": False, + "properties": { + "action": { + "type": "string", + "enum": ["restart", "redeploy", "notify", "ignore"], + }, + "service": {"type": "string"}, + "node": {"type": "string"}, + "reason": {"type": "string", "minLength": 10}, + "confidence": {"type": "number", "minimum": 0.0, "maximum": 1.0}, + "requires_human": {"type": "boolean"}, + }, +} + +# --------------------------------------------------------------------------- +# LLM system prompt +# --------------------------------------------------------------------------- +SYSTEM_PROMPT = """You are the planner agent for a distributed homelab orchestration system. +Your job is to diagnose infrastructure health events and propose a remediation action. + +Homelab topology: + vps — Hetzner VPS; public ingress, control plane + piha — Raspberry Pi 5; infra, monitoring, Redis, Ollama + solaria — GPU workstation; AI / compute workloads + chelsty-infra — LTE edge; Zigbee2MQTT, Mosquitto — offline-first + chelsty-ha — LTE edge; Home Assistant — offline-first + +Action selection rules: + restart — container exists but is stopped/unhealthy; docker restart suffices (low risk) + redeploy — container is broken beyond a simple restart; full docker compose up (guarded) + notify — human decision required; do not attempt automated fix + ignore — transient / one-off glitch; monitoring will catch a repeat + +Risk rules (enforce strictly): + - For any chelsty-* node: always set requires_human: true + - For disk_pressure events: always use "notify" + - If confidence < 0.7: set requires_human: true + - Unknown/novel failure patterns: prefer "notify" over guessing + +Respond with ONLY a single JSON object, no markdown, no commentary: +{ + "action": "restart|redeploy|notify|ignore", + "service": "", + "node": "", + "reason": "", + "confidence": <0.0–1.0>, + "requires_human": +}""" + + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + +@dataclass +class HealthEvent: + """Normalized health event ingested from a Redis channel message.""" + node: str + service: str + event_type: str # e.g. "service_unhealthy", "disk_pressure_high" + severity: str # "info" | "warning" | "error" | "critical" + payload: dict = field(default_factory=dict) + timestamp: float = field(default_factory=time.time) + raw_channel: str = "" + + @property + def svc_key(self) -> str: + return f"{self.node}/{self.service}" + + def __str__(self) -> str: + return f"[{self.event_type}] {self.svc_key} ({self.severity})" + + +@dataclass +class ActionProposal: + """Planner's structured output, written to actions/pending/.json.""" + action_id: str + type: str # executor type: "container_restart"|"redeploy"|"notify"|"ignore" + action: str # LLM's action: "restart"|"redeploy"|"notify"|"ignore" + service: str + node: str + reason: str + confidence: float + requires_human: bool + risk_level: str + status: str = "pending" + timestamp: float = field(default_factory=time.time) + source_event: str = "" + description: str = "" + llm_model: str = "" + llm_attempts: int = 0 + + def to_action_file(self) -> dict: + """Return a dict compatible with the executor's action file format.""" + return { + "action_id": self.action_id, + "type": self.type, + "node": self.node, + "service": self.service, + "risk_level": self.risk_level, + "confidence": self.confidence, + "requires_human": self.requires_human, + "description": self.description or self.reason, + "status": self.status, + "timestamp": self.timestamp, + "source_event": self.source_event, + "llm_model": self.llm_model, + "llm_attempts": self.llm_attempts, + "payload": { + "action": self.action, + "reason": self.reason, + }, + } + + +# --------------------------------------------------------------------------- +# Cooldown tracker +# --------------------------------------------------------------------------- + +class CooldownTracker: + """Gate: suppress duplicate proposals for the same service/node pair. + + A proposal is suppressed if a previous proposal for the same svc_key + was emitted within the last ``cooldown_seconds`` seconds. + """ + + def __init__(self, cooldown_seconds: float = COOLDOWN_SECONDS) -> None: + self._cooldown = cooldown_seconds + self._last: dict[str, float] = {} + + def is_ready(self, svc_key: str) -> bool: + """True when enough time has elapsed since the last proposal.""" + return (time.time() - self._last.get(svc_key, 0.0)) >= self._cooldown + + def record(self, svc_key: str) -> None: + """Mark a proposal as just emitted for svc_key.""" + self._last[svc_key] = time.time() + + def remaining_seconds(self, svc_key: str) -> float: + return max(0.0, self._cooldown - (time.time() - self._last.get(svc_key, 0.0))) + + def reset(self, svc_key: str) -> None: + """Force-reset cooldown (e.g. for testing or manual override).""" + self._last.pop(svc_key, None) + + +# --------------------------------------------------------------------------- +# Event emission (filesystem — no control-plane import) +# --------------------------------------------------------------------------- + +def _emit_event_sync( + event_type: str, + severity: str, + service: str, + correlation_id: str, + payload: Optional[dict] = None, + node: Optional[str] = None, +) -> None: + # Read NODE_NAME at call time (not import time) so monkeypatching works in tests. + if node is None: + node = NODE_NAME + """Write a normalized JSON event file to the filesystem event store. + + Mirrors scripts/lib/events.py behaviour — keeping planner fully + independent of the control-plane package. + """ + now = datetime.now(timezone.utc) + timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ") + date_dir = now.strftime("%Y-%m-%d") + svc_slug = (service or "planner").replace("/", "-").replace(" ", "-") + fname = f"evt-{node}-{int(time.time())}-{event_type}-{svc_slug}.json" + event_dir = EVENTS_DIR / date_dir / node + + try: + event_dir.mkdir(parents=True, exist_ok=True) + (event_dir / fname).write_text(json.dumps({ + "timestamp": timestamp, + "node": node, + "type": event_type, + "severity": severity, + "source": "planner-agent", + "service": service, + "correlation_id": correlation_id, + "payload": payload or {}, + }, indent=2)) + except Exception as exc: + log.warning("event_write_failed", path=str(event_dir / fname), error=str(exc)) + + +async def emit_event( + event_type: str, + severity: str, + service: str, + correlation_id: str, + payload: Optional[dict] = None, +) -> None: + """Async wrapper around _emit_event_sync (runs in thread pool).""" + await asyncio.to_thread( + _emit_event_sync, event_type, severity, service, correlation_id, payload + ) + + +# --------------------------------------------------------------------------- +# Action file I/O +# --------------------------------------------------------------------------- + +async def write_pending_action(proposal: ActionProposal) -> Path: + """Atomically write proposal JSON to actions/pending/.json.""" + pending_dir = ACTIONS_DIR / "pending" + pending_dir.mkdir(parents=True, exist_ok=True) + path = pending_dir / f"{proposal.action_id}.json" + + def _write() -> None: + # Write to tmp then rename so readers never see a partial file + tmp = path.with_suffix(".tmp") + tmp.write_text(json.dumps(proposal.to_action_file(), indent=2)) + tmp.replace(path) + + await asyncio.to_thread(_write) + return path + + +# --------------------------------------------------------------------------- +# LLM prompt helpers +# --------------------------------------------------------------------------- + +def build_messages(event: HealthEvent) -> list[dict]: + """Construct the OpenAI-style message list for one health event.""" + user_content = ( + f"Health event received:\n" + f" node: {event.node}\n" + f" service: {event.service}\n" + f" type: {event.event_type}\n" + f" severity: {event.severity}\n" + f" timestamp: {datetime.fromtimestamp(event.timestamp, tz=timezone.utc).isoformat()}\n" + ) + if event.payload: + payload_str = json.dumps(event.payload, indent=4) + user_content += f" payload:\n{payload_str}\n" + user_content += ( + "\nRespond with ONLY the JSON object as specified." + ) + return [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_content}, + ] + + +def map_action_to_executor_type(action: str) -> tuple[str, str]: + """Map LLM action name → (executor type, risk_level).""" + return { + "restart": ("container_restart", "low"), + "redeploy": ("redeploy", "guarded"), + "notify": ("notify", "low"), + "ignore": ("ignore", "none"), + }.get(action, ("notify", "low")) + + +# --------------------------------------------------------------------------- +# Event parsing +# --------------------------------------------------------------------------- + +def parse_event(raw: dict, channel: str) -> Optional[HealthEvent]: + """Normalise a raw Redis pub/sub payload into a HealthEvent. + + Accepts two common shapes: + + Shape A — node-agent / stability-agent filesystem event format: + {"type": "service_unhealthy", "node": "piha", "service": "mosquitto", + "severity": "error", "payload": {...}} + + Shape B — control-plane world_updates format: + {"event_type": "...", "node": "...", "service": "...", ...} + """ + event_type = raw.get("type") or raw.get("event_type", "") + node = (raw.get("node") or "").strip() + service = (raw.get("service") or "").strip() + severity = (raw.get("severity") or "info").strip() + + if not event_type or not node: + return None + + # For node-level events (e.g. node_offline) without a service field + if not service: + details = raw.get("details") or raw.get("payload") or {} + service = details.get("service", "") if isinstance(details, dict) else "" + if not service: + service = node # fallback: use node name as service key + + # Parse timestamp + ts_raw = raw.get("timestamp", time.time()) + if isinstance(ts_raw, str): + try: + ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp() + except (ValueError, AttributeError): + ts = time.time() + else: + try: + ts = float(ts_raw) + except (TypeError, ValueError): + ts = time.time() + + payload = raw.get("payload") or raw.get("details") or {} + if not isinstance(payload, dict): + payload = {} + + return HealthEvent( + node = node, + service = service, + event_type = event_type, + severity = severity, + payload = payload, + timestamp = ts, + raw_channel = channel, + ) + + +# --------------------------------------------------------------------------- +# Planner agent +# --------------------------------------------------------------------------- + +# Event types that require no action (healthy signals, completions) +_BENIGN_EVENTS = frozenset({ + "service_healthy", + "service_recovered", + "node_online", + "deployment_completed", + "deployment_started", + "remediation_started", + "remediation_completed", +}) + + +class PlannerAgent: + """Async agent: subscribe → receive → diagnose → propose action. + + Designed for testability: all I/O (Redis, filesystem, LLM) is + injected or mockable. The ``router`` parameter accepts a pre-built + LLMRouter so tests can substitute it without network calls. + """ + + def __init__( + self, + redis_url: str = REDIS_URL, + ollama_host: str = OLLAMA_HOST, + ollama_model: str = OLLAMA_MODEL, + router: Optional[LLMRouter] = None, + cooldown: Optional[CooldownTracker] = None, + ) -> None: + self._redis_url = redis_url + self._redis: Optional[aioredis.Redis] = None + self._pubsub: Optional[aioredis.client.PubSub] = None + self._running = False + + self.router = router or LLMRouter( + redis_url = redis_url, + ollama_host = ollama_host, + ollama_model = ollama_model, + ) + self.cooldown = cooldown or CooldownTracker() + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + self._redis = aioredis.from_url( + self._redis_url, + decode_responses=True, + socket_connect_timeout=5, + socket_timeout=10, + ) + self._pubsub = self._redis.pubsub(ignore_subscribe_messages=True) + await self._pubsub.subscribe(*SUBSCRIBE_CHANNELS) + log.info("planner_started", channels=SUBSCRIBE_CHANNELS, node=NODE_NAME) + + async def stop(self) -> None: + self._running = False + if self._pubsub: + try: + await self._pubsub.unsubscribe() + await self._pubsub.aclose() + except Exception: + pass + if self._redis: + try: + await self._redis.aclose() + except Exception: + pass + try: + await self.router.close() + except Exception: + pass + log.info("planner_stopped") + + async def run(self) -> None: + """Main event loop. Runs until cancelled or SIGINT/SIGTERM.""" + await self.start() + self._running = True + _ensure_dirs() + + try: + while self._running: + self._touch_heartbeat() + try: + msg = await asyncio.wait_for( + self._pubsub.get_message(ignore_subscribe_messages=True), + timeout=5.0, + ) + except asyncio.TimeoutError: + continue + + if msg is None: + await asyncio.sleep(0.05) + continue + + await self._dispatch(msg) + + except asyncio.CancelledError: + log.info("planner_cancelled") + except Exception as exc: + log.exception("planner_fatal_error", error=str(exc)) + raise + finally: + await self.stop() + + # ------------------------------------------------------------------ + # Message dispatch + # ------------------------------------------------------------------ + + async def _dispatch(self, msg: dict) -> None: + """Deserialise one Redis pub/sub message and hand off to _handle_event.""" + channel = msg.get("channel", "") + data = msg.get("data", "") + + if not isinstance(data, str): + return + + try: + raw = json.loads(data) + except json.JSONDecodeError: + log.warning("malformed_message", channel=channel, preview=data[:120]) + return + + if not isinstance(raw, dict): + return + + event = parse_event(raw, channel) + if event is None: + log.debug("unparseable_event", channel=channel, keys=list(raw.keys())) + return + + log.info( + "event_received", + channel = channel, + svc_key = event.svc_key, + type = event.event_type, + severity = event.severity, + ) + await self._handle_event(event) + + # ------------------------------------------------------------------ + # Core pipeline + # ------------------------------------------------------------------ + + async def _handle_event(self, event: HealthEvent) -> None: + """Cooldown → LLM proposal → write pending action → emit event.""" + + # Benign events need no remediation + if event.event_type in _BENIGN_EVENTS: + log.debug("benign_event_skipped", type=event.event_type, svc_key=event.svc_key) + return + + svc_key = event.svc_key + + if not self.cooldown.is_ready(svc_key): + log.info( + "cooldown_active", + svc_key = svc_key, + remaining_seconds = round(self.cooldown.remaining_seconds(svc_key)), + ) + return + + proposal = await self._propose_action(event) + if proposal is None: + # LLM fully failed — do not record cooldown so next event can retry + return + + self.cooldown.record(svc_key) + + if proposal.action == "ignore": + log.info( + "proposal_ignored", + svc_key = svc_key, + reason = proposal.reason, + confidence = proposal.confidence, + llm_model = proposal.llm_model, + ) + return + + # Write to pending (human must approve before executor runs it) + try: + path = await write_pending_action(proposal) + except Exception as exc: + log.error("action_write_failed", svc_key=svc_key, error=str(exc)) + return + + log.info( + "action_proposed", + action_id = proposal.action_id, + action = proposal.action, + executor_type = proposal.type, + svc_key = svc_key, + requires_human = proposal.requires_human, + confidence = proposal.confidence, + risk_level = proposal.risk_level, + llm_model = proposal.llm_model, + path = str(path), + ) + + await emit_event( + event_type = "remediation_started", + severity = "info", + service = event.service, + correlation_id = proposal.action_id, + payload = { + "action": proposal.action, + "executor_type": proposal.type, + "node": event.node, + "action_id": proposal.action_id, + "requires_human": proposal.requires_human, + "confidence": proposal.confidence, + "llm_model": proposal.llm_model, + }, + ) + + # ------------------------------------------------------------------ + # LLM call + # ------------------------------------------------------------------ + + async def _propose_action(self, event: HealthEvent) -> Optional[ActionProposal]: + """Invoke LLMRouter and map the validated response to an ActionProposal.""" + messages = build_messages(event) + action_id = ( + f"plan-{event.node}-{event.service.replace('/', '-')}" + f"-{int(event.timestamp)}" + ) + + try: + result: RouteResult = await self.router.route( + messages = messages, + schema = PROPOSAL_SCHEMA, + context = f"planner.{event.svc_key}", + ) + except RuntimeError as exc: + log.error( + "llm_chain_exhausted", + svc_key = event.svc_key, + error = str(exc)[:400], + ) + return None + + raw = result.content # already parsed + schema-validated by LLMRouter + action = raw["action"] + ex_type, risk = map_action_to_executor_type(action) + + return ActionProposal( + action_id = action_id, + type = ex_type, + action = action, + service = raw.get("service") or event.service, + node = raw.get("node") or event.node, + reason = raw["reason"], + confidence = float(raw["confidence"]), + requires_human = bool(raw["requires_human"]), + risk_level = risk, + timestamp = event.timestamp, + source_event = event.event_type, + description = ( + f"{action.upper()} {raw.get('service', event.service)} " + f"on {raw.get('node', event.node)}: {raw['reason']}" + ), + llm_model = result.model_used, + llm_attempts = len(result.attempts), + ) + + # ------------------------------------------------------------------ + # Utilities + # ------------------------------------------------------------------ + + def _touch_heartbeat(self) -> None: + try: + STATE_DIR.mkdir(parents=True, exist_ok=True) + HEARTBEAT.touch() + except Exception as exc: + log.warning("heartbeat_failed", error=str(exc)) + + +# --------------------------------------------------------------------------- +# Module helpers +# --------------------------------------------------------------------------- + +def _ensure_dirs() -> None: + for sub in ("pending", "approved", "running", "completed", + "failed", "rejected", "cancelled"): + (ACTIONS_DIR / sub).mkdir(parents=True, exist_ok=True) + STATE_DIR.mkdir(parents=True, exist_ok=True) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +async def _main() -> None: + agent = PlannerAgent() + loop = asyncio.get_running_loop() + + def _shutdown(sig_name: str) -> None: + log.info("shutdown_signal", signal=sig_name) + agent._running = False + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, _shutdown, sig.name) + + await agent.run() + + +if __name__ == "__main__": + asyncio.run(_main()) diff --git a/services/planner-agent/tests/test_planner.py b/services/planner-agent/tests/test_planner.py new file mode 100644 index 0000000..8e00ee6 --- /dev/null +++ b/services/planner-agent/tests/test_planner.py @@ -0,0 +1,604 @@ +""" +Unit tests for planner.py. + +All Redis, LLMRouter, and filesystem operations are mocked — +no network or disk I/O required. + +Run: + pytest services/planner-agent/tests/test_planner.py -v +""" + +import asyncio +import json +import sys +import time +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch, call + +import pytest + +# Allow importing from src/ without installation +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from planner import ( + ActionProposal, + CooldownTracker, + HealthEvent, + PlannerAgent, + build_messages, + map_action_to_executor_type, + parse_event, + write_pending_action, + emit_event, + PROPOSAL_SCHEMA, +) +from llm_router import AttemptRecord, RouteResult + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_route_result( + action: str = "restart", + service: str = "mosquitto", + node: str = "piha", + reason: str = "Container is stopped", + confidence: float = 0.9, + requires_human: bool = False, + model: str = "ollama/qwen2.5:7b", +) -> RouteResult: + content = { + "action": action, + "service": service, + "node": node, + "reason": reason, + "confidence": confidence, + "requires_human": requires_human, + } + return RouteResult( + content = content, + raw_text = json.dumps(content), + model_used = model, + attempts = [AttemptRecord(model, "success", None, 120)], + latency_ms = 120, + ) + + +def _health_event( + node: str = "piha", + service: str = "mosquitto", + event_type: str = "service_unhealthy", + severity: str = "error", + payload: dict = None, +) -> HealthEvent: + return HealthEvent( + node = node, + service = service, + event_type = event_type, + severity = severity, + payload = payload or {}, + timestamp = time.time(), + ) + + +def _mock_router(result: RouteResult) -> MagicMock: + router = MagicMock() + router.route = AsyncMock(return_value=result) + router.close = AsyncMock() + return router + + +# --------------------------------------------------------------------------- +# CooldownTracker +# --------------------------------------------------------------------------- + +class TestCooldownTracker: + def test_initially_ready(self): + ct = CooldownTracker(cooldown_seconds=60) + assert ct.is_ready("piha/mosquitto") is True + + def test_not_ready_after_record(self): + ct = CooldownTracker(cooldown_seconds=300) + ct.record("piha/mosquitto") + assert ct.is_ready("piha/mosquitto") is False + + def test_ready_after_elapsed(self): + ct = CooldownTracker(cooldown_seconds=1) + ct.record("piha/mosquitto") + time.sleep(1.1) + assert ct.is_ready("piha/mosquitto") is True + + def test_remaining_seconds_decreases(self): + ct = CooldownTracker(cooldown_seconds=60) + ct.record("piha/mosquitto") + r = ct.remaining_seconds("piha/mosquitto") + assert 0 < r <= 60 + + def test_remaining_zero_when_never_recorded(self): + ct = CooldownTracker() + assert ct.remaining_seconds("ghost/svc") == 0.0 + + def test_reset_clears_cooldown(self): + ct = CooldownTracker(cooldown_seconds=300) + ct.record("piha/mosquitto") + assert ct.is_ready("piha/mosquitto") is False + ct.reset("piha/mosquitto") + assert ct.is_ready("piha/mosquitto") is True + + def test_independent_keys(self): + ct = CooldownTracker(cooldown_seconds=300) + ct.record("piha/mosquitto") + assert ct.is_ready("piha/mosquitto") is False + assert ct.is_ready("solaria/ollama") is True + + +# --------------------------------------------------------------------------- +# HealthEvent +# --------------------------------------------------------------------------- + +class TestHealthEvent: + def test_svc_key(self): + e = _health_event("piha", "mosquitto") + assert e.svc_key == "piha/mosquitto" + + def test_str_repr(self): + e = _health_event("vps", "observer", "service_unhealthy", "error") + assert "service_unhealthy" in str(e) + assert "vps/observer" in str(e) + + +# --------------------------------------------------------------------------- +# ActionProposal.to_action_file +# --------------------------------------------------------------------------- + +class TestActionProposal: + def _sample(self, **kwargs) -> ActionProposal: + defaults = dict( + action_id = "plan-piha-mosquitto-123", + type = "container_restart", + action = "restart", + service = "mosquitto", + node = "piha", + reason = "Container stopped unexpectedly", + confidence = 0.9, + requires_human = False, + risk_level = "low", + ) + defaults.update(kwargs) + return ActionProposal(**defaults) + + def test_to_action_file_keys(self): + d = self._sample().to_action_file() + for key in ("action_id", "type", "node", "service", "risk_level", + "confidence", "requires_human", "status", "timestamp", + "source_event", "llm_model", "llm_attempts", "payload"): + assert key in d, f"missing key: {key}" + + def test_status_pending(self): + d = self._sample().to_action_file() + assert d["status"] == "pending" + + def test_payload_contains_action_and_reason(self): + d = self._sample().to_action_file() + assert d["payload"]["action"] == "restart" + assert "Container stopped" in d["payload"]["reason"] + + def test_description_fallback_to_reason(self): + p = self._sample(description="") + d = p.to_action_file() + assert d["description"] == p.reason + + +# --------------------------------------------------------------------------- +# map_action_to_executor_type +# --------------------------------------------------------------------------- + +class TestMapActionToExecutorType: + @pytest.mark.parametrize("action,expected_type,expected_risk", [ + ("restart", "container_restart", "low"), + ("redeploy", "redeploy", "guarded"), + ("notify", "notify", "low"), + ("ignore", "ignore", "none"), + ("unknown", "notify", "low"), # safe fallback + ]) + def test_mapping(self, action, expected_type, expected_risk): + t, r = map_action_to_executor_type(action) + assert t == expected_type + assert r == expected_risk + + +# --------------------------------------------------------------------------- +# parse_event +# --------------------------------------------------------------------------- + +class TestParseEvent: + def test_shape_a_node_agent(self): + raw = { + "type": "service_unhealthy", + "node": "piha", + "service": "mosquitto", + "severity": "error", + "payload": {"status": "exited"}, + } + ev = parse_event(raw, "health_events") + assert ev is not None + assert ev.node == "piha" + assert ev.service == "mosquitto" + assert ev.event_type == "service_unhealthy" + assert ev.severity == "error" + assert ev.payload == {"status": "exited"} + + def test_shape_b_world_updates(self): + raw = { + "event_type": "node_offline", + "node": "chelsty-infra", + "service": "mosquitto", + "severity": "critical", + } + ev = parse_event(raw, "world_updates") + assert ev is not None + assert ev.event_type == "node_offline" + assert ev.node == "chelsty-infra" + + def test_missing_node_returns_none(self): + raw = {"type": "service_unhealthy", "service": "mosquitto"} + assert parse_event(raw, "health_events") is None + + def test_missing_type_returns_none(self): + raw = {"node": "piha", "service": "mosquitto"} + assert parse_event(raw, "health_events") is None + + def test_service_falls_back_to_node(self): + raw = {"type": "node_offline", "node": "piha"} + ev = parse_event(raw, "health_events") + assert ev is not None + assert ev.service == "piha" + + def test_timestamp_iso_parsed(self): + raw = { + "type": "service_unhealthy", + "node": "piha", + "service": "mosquitto", + "timestamp": "2026-05-27T12:00:00Z", + } + ev = parse_event(raw, "health_events") + assert ev is not None + assert ev.timestamp > 1_700_000_000 # sanity: recent epoch + + def test_timestamp_numeric_accepted(self): + ts = time.time() + raw = {"type": "service_unhealthy", "node": "piha", "service": "mosquitto", + "timestamp": ts} + ev = parse_event(raw, "health_events") + assert abs(ev.timestamp - ts) < 1 + + def test_channel_stored(self): + raw = {"type": "service_unhealthy", "node": "piha", "service": "mosquitto"} + ev = parse_event(raw, "world_updates") + assert ev.raw_channel == "world_updates" + + +# --------------------------------------------------------------------------- +# build_messages +# --------------------------------------------------------------------------- + +class TestBuildMessages: + def test_returns_two_messages(self): + ev = _health_event() + msgs = build_messages(ev) + assert len(msgs) == 2 + assert msgs[0]["role"] == "system" + assert msgs[1]["role"] == "user" + + def test_user_message_contains_event_fields(self): + ev = _health_event("vps", "observer", "service_unhealthy", "error", + payload={"exit_code": 1}) + msgs = build_messages(ev) + user = msgs[1]["content"] + assert "vps" in user + assert "observer" in user + assert "service_unhealthy" in user + + def test_payload_included_when_present(self): + ev = _health_event(payload={"disk_pct": 95}) + msgs = build_messages(ev) + assert "disk_pct" in msgs[1]["content"] + + def test_system_prompt_contains_homelab_rules(self): + ev = _health_event() + msgs = build_messages(ev) + sys_content = msgs[0]["content"] + assert "chelsty" in sys_content + assert "requires_human" in sys_content + + +# --------------------------------------------------------------------------- +# PlannerAgent._handle_event +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +class TestPlannerHandleEvent: + def _agent(self, result: RouteResult) -> PlannerAgent: + router = _mock_router(result) + return PlannerAgent(redis_url=None, router=router) + + async def test_benign_event_no_proposal(self, tmp_path): + agent = self._agent(_make_route_result()) + ev = _health_event(event_type="service_healthy") + with patch("planner.write_pending_action", new=AsyncMock()) as mock_write: + await agent._handle_event(ev) + mock_write.assert_not_called() + + async def test_cooldown_blocks_duplicate(self): + agent = self._agent(_make_route_result()) + ev = _health_event() + agent.cooldown.record(ev.svc_key) # simulate recent proposal + with patch("planner.write_pending_action", new=AsyncMock()) as mock_write: + await agent._handle_event(ev) + mock_write.assert_not_called() + agent.router.route.assert_not_called() + + async def test_ignore_action_no_file_written(self): + agent = self._agent(_make_route_result(action="ignore", reason="Transient glitch")) + ev = _health_event() + with patch("planner.write_pending_action", new=AsyncMock()) as mock_write: + await agent._handle_event(ev) + mock_write.assert_not_called() + + async def test_ignore_records_cooldown(self): + agent = self._agent(_make_route_result(action="ignore", reason="Transient glitch")) + ev = _health_event() + with patch("planner.write_pending_action", new=AsyncMock()): + await agent._handle_event(ev) + assert not agent.cooldown.is_ready(ev.svc_key) + + async def test_restart_action_writes_pending_file(self, tmp_path): + agent = self._agent(_make_route_result(action="restart")) + ev = _health_event() + + captured: list[ActionProposal] = [] + + async def fake_write(p: ActionProposal) -> Path: + captured.append(p) + return tmp_path / f"{p.action_id}.json" + + with patch("planner.write_pending_action", new=fake_write), \ + patch("planner.emit_event", new=AsyncMock()): + await agent._handle_event(ev) + + assert len(captured) == 1 + assert captured[0].action == "restart" + assert captured[0].type == "container_restart" + + async def test_redeploy_action_risk_guarded(self, tmp_path): + agent = self._agent(_make_route_result(action="redeploy")) + ev = _health_event() + + captured: list[ActionProposal] = [] + + async def fake_write(p: ActionProposal) -> Path: + captured.append(p) + return tmp_path / f"{p.action_id}.json" + + with patch("planner.write_pending_action", new=fake_write), \ + patch("planner.emit_event", new=AsyncMock()): + await agent._handle_event(ev) + + assert captured[0].risk_level == "guarded" + assert captured[0].type == "redeploy" + + async def test_remediation_started_event_emitted(self, tmp_path): + agent = self._agent(_make_route_result(action="restart")) + ev = _health_event() + + emitted: list[tuple] = [] + + async def fake_emit(event_type, severity, service, correlation_id, payload=None): + emitted.append((event_type, service, correlation_id)) + + with patch("planner.write_pending_action", new=AsyncMock(return_value=tmp_path / "x.json")), \ + patch("planner.emit_event", new=fake_emit): + await agent._handle_event(ev) + + assert len(emitted) == 1 + assert emitted[0][0] == "remediation_started" + assert emitted[0][1] == ev.service + + async def test_llm_failure_no_file_no_cooldown(self): + router = MagicMock() + router.route = AsyncMock(side_effect=RuntimeError("all models failed")) + router.close = AsyncMock() + agent = PlannerAgent(redis_url=None, router=router) + ev = _health_event() + + with patch("planner.write_pending_action", new=AsyncMock()) as mock_write: + await agent._handle_event(ev) + + mock_write.assert_not_called() + # Cooldown NOT recorded — next event should be able to retry + assert agent.cooldown.is_ready(ev.svc_key) is True + + async def test_requires_human_preserved_in_proposal(self, tmp_path): + agent = self._agent( + _make_route_result(action="restart", requires_human=True, confidence=0.6) + ) + ev = _health_event() + + captured: list[ActionProposal] = [] + + async def fake_write(p: ActionProposal) -> Path: + captured.append(p) + return tmp_path / f"{p.action_id}.json" + + with patch("planner.write_pending_action", new=fake_write), \ + patch("planner.emit_event", new=AsyncMock()): + await agent._handle_event(ev) + + assert captured[0].requires_human is True + + async def test_cooldown_recorded_after_success(self, tmp_path): + agent = self._agent(_make_route_result(action="restart")) + ev = _health_event() + + with patch("planner.write_pending_action", + new=AsyncMock(return_value=tmp_path / "x.json")), \ + patch("planner.emit_event", new=AsyncMock()): + await agent._handle_event(ev) + + assert not agent.cooldown.is_ready(ev.svc_key) + + async def test_llm_model_recorded_in_proposal(self, tmp_path): + agent = self._agent( + _make_route_result(action="restart", model="claude-haiku-4-5-20251001") + ) + ev = _health_event() + + captured: list[ActionProposal] = [] + + async def fake_write(p: ActionProposal) -> Path: + captured.append(p) + return tmp_path / f"{p.action_id}.json" + + with patch("planner.write_pending_action", new=fake_write), \ + patch("planner.emit_event", new=AsyncMock()): + await agent._handle_event(ev) + + assert captured[0].llm_model == "claude-haiku-4-5-20251001" + + +# --------------------------------------------------------------------------- +# PlannerAgent._dispatch +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +class TestPlannerDispatch: + def _agent(self) -> PlannerAgent: + router = _mock_router(_make_route_result()) + return PlannerAgent(redis_url=None, router=router) + + async def test_valid_json_dispatched(self): + agent = self._agent() + msg = { + "channel": "health_events", + "data": json.dumps({ + "type": "service_unhealthy", + "node": "piha", + "service": "mosquitto", + "severity": "error", + }), + } + with patch.object(agent, "_handle_event", new=AsyncMock()) as mock_handle: + await agent._dispatch(msg) + mock_handle.assert_awaited_once() + + async def test_invalid_json_skipped(self): + agent = self._agent() + msg = {"channel": "health_events", "data": "{not valid json"} + with patch.object(agent, "_handle_event", new=AsyncMock()) as mock_handle: + await agent._dispatch(msg) + mock_handle.assert_not_called() + + async def test_non_string_data_skipped(self): + agent = self._agent() + msg = {"channel": "health_events", "data": 42} + with patch.object(agent, "_handle_event", new=AsyncMock()) as mock_handle: + await agent._dispatch(msg) + mock_handle.assert_not_called() + + async def test_missing_node_skipped(self): + agent = self._agent() + msg = { + "channel": "health_events", + "data": json.dumps({"type": "service_unhealthy", "service": "mosquitto"}), + } + with patch.object(agent, "_handle_event", new=AsyncMock()) as mock_handle: + await agent._dispatch(msg) + mock_handle.assert_not_called() + + +# --------------------------------------------------------------------------- +# write_pending_action (integration-style with tmp_path) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +class TestWritePendingAction: + async def test_file_created_with_correct_content(self, tmp_path): + proposal = ActionProposal( + action_id = "plan-piha-mosquitto-1000", + type = "container_restart", + action = "restart", + service = "mosquitto", + node = "piha", + reason = "Container stopped unexpectedly", + confidence = 0.95, + requires_human = False, + risk_level = "low", + ) + with patch("planner.ACTIONS_DIR", tmp_path): + path = await write_pending_action(proposal) + + assert path.exists() + data = json.loads(path.read_text()) + assert data["action_id"] == "plan-piha-mosquitto-1000" + assert data["status"] == "pending" + assert data["type"] == "container_restart" + assert data["confidence"] == 0.95 + assert data["requires_human"] is False + + async def test_file_is_valid_json(self, tmp_path): + proposal = ActionProposal( + action_id="x", type="redeploy", action="redeploy", + service="ollama", node="solaria", + reason="Service is broken beyond a simple restart", + confidence=0.8, requires_human=True, risk_level="guarded", + ) + with patch("planner.ACTIONS_DIR", tmp_path): + path = await write_pending_action(proposal) + # Should not raise + json.loads(path.read_text()) + + +# --------------------------------------------------------------------------- +# emit_event (filesystem write) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +class TestEmitEvent: + async def test_event_file_created(self, tmp_path): + with patch("planner.EVENTS_DIR", tmp_path), \ + patch("planner.NODE_NAME", "test-node"): + await emit_event( + event_type = "remediation_started", + severity = "info", + service = "mosquitto", + correlation_id = "plan-abc-123", + payload = {"action": "restart"}, + ) + + files = list(tmp_path.rglob("*.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["type"] == "remediation_started" + assert data["service"] == "mosquitto" + assert data["correlation_id"] == "plan-abc-123" + assert data["payload"]["action"] == "restart" + + async def test_event_dir_structure(self, tmp_path): + """Events must be stored under YYYY-MM-DD//.""" + import planner as planner_mod + orig = planner_mod.NODE_NAME + planner_mod.NODE_NAME = "piha" + try: + with patch("planner.EVENTS_DIR", tmp_path): + await emit_event("test_event", "info", "svc", "cid-1") + finally: + planner_mod.NODE_NAME = orig + + files = list(tmp_path.rglob("*.json")) + assert len(files) == 1 + # Path: // + parts = files[0].relative_to(tmp_path).parts + assert len(parts) == 3 # date / node / file.json + assert parts[1] == "piha"