homelab-codex-ws/services/planner-agent/src/planner.py

710 lines
25 KiB
Python
Raw Normal View History

feat(planner-agent): main loop with LLM routing and HITL action proposals services/planner-agent/src/planner.py: - PlannerAgent: async Redis pub/sub on health_events + world_updates - Pipeline: receive event → cooldown gate → LLMRouter → write pending action → emit remediation_started filesystem event - CooldownTracker: 5-min suppression per svc_key (configurable via env) - parse_event(): accepts node-agent shape A and world_updates shape B - PROPOSAL_SCHEMA: jsonschema enforced by LLMRouter before accepting response - SYSTEM_PROMPT: homelab topology + action rules (chelsty always requires_human, disk_pressure always notify, confidence<0.7 → requires_human) - write_pending_action(): atomic tmp→rename write, executor-compatible format - emit_event(): async wrapper around filesystem event write (no control-plane import) - _emit_event_sync() reads NODE_NAME at call time (not import) for testability - Benign events (service_healthy, node_online, ...) silently skipped - LLM chain failure: no cooldown recorded so next event can retry services/planner-agent/tests/test_planner.py (49 tests, 0 network): - TestCooldownTracker: 7 tests (ready/not-ready/elapsed/reset/independence) - TestHealthEvent, TestActionProposal, TestMapActionToExecutorType - TestParseEvent: both event shapes, missing fields, timestamp formats - TestBuildMessages: system prompt rules, payload inclusion - TestPlannerHandleEvent: benign skip, cooldown block, ignore/restart/redeploy/ notify proposals, remediation event emission, LLM failure isolation, requires_human propagation, cooldown recording, model name in proposal - TestPlannerDispatch: valid JSON, invalid JSON, non-string data, missing node - TestWritePendingAction, TestEmitEvent: filesystem integration with tmp_path services/planner-agent/service.yaml: owner_node: solaria, dependencies: [redis, ollama] services/planner-agent/docker-compose.yml: env + healthcheck services/planner-agent/Dockerfile: python:3.11-slim services/planner-agent/healthcheck.sh: heartbeat file age check (300s) services/planner-agent/requirements.txt: litellm, redis, jsonschema, structlog Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 19:11:39 +02:00
"""
planner.py planner-agent main loop.
Listens to Redis pub/sub channels:
- health_events: node-agent / stability-agent health notifications
- world_updates: observer world-state change notifications
For each event that clears the cooldown gate:
1. Ask LLMRouter to diagnose and produce a structured action proposal.
2. Write proposal to /opt/homelab/actions/pending/<id>.json.
3. Emit a remediation_started filesystem event.
Human-in-the-loop invariant
----------------------------
The planner ONLY writes to actions/pending/. Execution requires an
operator-approved action file in actions/approved/ the planner
never touches that directory.
"""
import asyncio
import json
import os
import signal
import sys
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import redis.asyncio as aioredis
import structlog
# Allow running from src/ directory without installation
sys.path.insert(0, str(Path(__file__).parent))
from llm_router import LLMRouter, RouteResult # noqa: E402
# ---------------------------------------------------------------------------
# Structured logging — JSON to stdout
# ---------------------------------------------------------------------------
structlog.configure(
processors=[
structlog.stdlib.add_log_level, # adds "level" key
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(20), # INFO+
logger_factory=structlog.PrintLoggerFactory(),
# add_logger_name is intentionally excluded: it requires a stdlib
# logger with a .name attribute; PrintLogger does not have one.
)
log = structlog.get_logger("planner")
# ---------------------------------------------------------------------------
# Runtime paths
# ---------------------------------------------------------------------------
RUNTIME_PATH = Path(os.getenv("RUNTIME_PATH", "/opt/homelab"))
ACTIONS_DIR = RUNTIME_PATH / "actions"
EVENTS_DIR = RUNTIME_PATH / "events"
STATE_DIR = RUNTIME_PATH / "state"
HEARTBEAT = STATE_DIR / "planner-agent.heartbeat"
# ---------------------------------------------------------------------------
# Configuration (from env)
# ---------------------------------------------------------------------------
REDIS_URL = os.getenv("REDIS_URL", "redis://100.108.208.3:6379")
OLLAMA_API_BASE = os.getenv("OLLAMA_API_BASE", "http://100.108.208.3:11434")
feat(planner-agent): main loop with LLM routing and HITL action proposals services/planner-agent/src/planner.py: - PlannerAgent: async Redis pub/sub on health_events + world_updates - Pipeline: receive event → cooldown gate → LLMRouter → write pending action → emit remediation_started filesystem event - CooldownTracker: 5-min suppression per svc_key (configurable via env) - parse_event(): accepts node-agent shape A and world_updates shape B - PROPOSAL_SCHEMA: jsonschema enforced by LLMRouter before accepting response - SYSTEM_PROMPT: homelab topology + action rules (chelsty always requires_human, disk_pressure always notify, confidence<0.7 → requires_human) - write_pending_action(): atomic tmp→rename write, executor-compatible format - emit_event(): async wrapper around filesystem event write (no control-plane import) - _emit_event_sync() reads NODE_NAME at call time (not import) for testability - Benign events (service_healthy, node_online, ...) silently skipped - LLM chain failure: no cooldown recorded so next event can retry services/planner-agent/tests/test_planner.py (49 tests, 0 network): - TestCooldownTracker: 7 tests (ready/not-ready/elapsed/reset/independence) - TestHealthEvent, TestActionProposal, TestMapActionToExecutorType - TestParseEvent: both event shapes, missing fields, timestamp formats - TestBuildMessages: system prompt rules, payload inclusion - TestPlannerHandleEvent: benign skip, cooldown block, ignore/restart/redeploy/ notify proposals, remediation event emission, LLM failure isolation, requires_human propagation, cooldown recording, model name in proposal - TestPlannerDispatch: valid JSON, invalid JSON, non-string data, missing node - TestWritePendingAction, TestEmitEvent: filesystem integration with tmp_path services/planner-agent/service.yaml: owner_node: solaria, dependencies: [redis, ollama] services/planner-agent/docker-compose.yml: env + healthcheck services/planner-agent/Dockerfile: python:3.11-slim services/planner-agent/healthcheck.sh: heartbeat file age check (300s) services/planner-agent/requirements.txt: litellm, redis, jsonschema, structlog Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 19:11:39 +02:00
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
NODE_NAME = os.getenv("NODE_NAME", "solaria")
COOLDOWN_SECONDS = int(os.getenv("COOLDOWN_SECONDS", "300")) # 5 min
SUBSCRIBE_CHANNELS = ["health_events", "world_updates"]
# ---------------------------------------------------------------------------
# JSON Schema — validated by LLMRouter (jsonschema) before accepting response
# ---------------------------------------------------------------------------
PROPOSAL_SCHEMA: dict = {
"type": "object",
"required": ["action", "service", "node", "reason", "confidence", "requires_human"],
"additionalProperties": False,
"properties": {
"action": {
"type": "string",
"enum": ["restart", "redeploy", "notify", "ignore"],
},
"service": {"type": "string"},
"node": {"type": "string"},
"reason": {"type": "string", "minLength": 10},
"confidence": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"requires_human": {"type": "boolean"},
},
}
# ---------------------------------------------------------------------------
# LLM system prompt
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """You are the planner agent for a distributed homelab orchestration system.
Your job is to diagnose infrastructure health events and propose a remediation action.
Homelab topology:
vps Hetzner VPS; public ingress, control plane
piha Raspberry Pi 5; infra, monitoring, Redis, Ollama
solaria GPU workstation; AI / compute workloads
chelsty-infra LTE edge; Zigbee2MQTT, Mosquitto offline-first
chelsty-ha LTE edge; Home Assistant offline-first
Action selection rules:
restart container exists but is stopped/unhealthy; docker restart suffices (low risk)
redeploy container is broken beyond a simple restart; full docker compose up (guarded)
notify human decision required; do not attempt automated fix
ignore transient / one-off glitch; monitoring will catch a repeat
Risk rules (enforce strictly):
- For any chelsty-* node: always set requires_human: true
- For disk_pressure events: always use "notify"
- If confidence < 0.7: set requires_human: true
- Unknown/novel failure patterns: prefer "notify" over guessing
Respond with ONLY a single JSON object, no markdown, no commentary:
{
"action": "restart|redeploy|notify|ignore",
"service": "<service name>",
"node": "<node name>",
"reason": "<concise explanation, minimum 10 characters>",
"confidence": <0.01.0>,
"requires_human": <true|false>
}"""
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class HealthEvent:
"""Normalized health event ingested from a Redis channel message."""
node: str
service: str
event_type: str # e.g. "service_unhealthy", "disk_pressure_high"
severity: str # "info" | "warning" | "error" | "critical"
payload: dict = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
raw_channel: str = ""
@property
def svc_key(self) -> str:
return f"{self.node}/{self.service}"
def __str__(self) -> str:
return f"[{self.event_type}] {self.svc_key} ({self.severity})"
@dataclass
class ActionProposal:
"""Planner's structured output, written to actions/pending/<id>.json."""
action_id: str
type: str # executor type: "container_restart"|"redeploy"|"notify"|"ignore"
action: str # LLM's action: "restart"|"redeploy"|"notify"|"ignore"
service: str
node: str
reason: str
confidence: float
requires_human: bool
risk_level: str
status: str = "pending"
timestamp: float = field(default_factory=time.time)
source_event: str = ""
description: str = ""
llm_model: str = ""
llm_attempts: int = 0
def to_action_file(self) -> dict:
"""Return a dict compatible with the executor's action file format."""
return {
"action_id": self.action_id,
"type": self.type,
"node": self.node,
"service": self.service,
"risk_level": self.risk_level,
"confidence": self.confidence,
"requires_human": self.requires_human,
"description": self.description or self.reason,
"status": self.status,
"timestamp": self.timestamp,
"source_event": self.source_event,
"llm_model": self.llm_model,
"llm_attempts": self.llm_attempts,
"payload": {
"action": self.action,
"reason": self.reason,
},
}
# ---------------------------------------------------------------------------
# Cooldown tracker
# ---------------------------------------------------------------------------
class CooldownTracker:
"""Gate: suppress duplicate proposals for the same service/node pair.
A proposal is suppressed if a previous proposal for the same svc_key
was emitted within the last ``cooldown_seconds`` seconds.
"""
def __init__(self, cooldown_seconds: float = COOLDOWN_SECONDS) -> None:
self._cooldown = cooldown_seconds
self._last: dict[str, float] = {}
def is_ready(self, svc_key: str) -> bool:
"""True when enough time has elapsed since the last proposal."""
return (time.time() - self._last.get(svc_key, 0.0)) >= self._cooldown
def record(self, svc_key: str) -> None:
"""Mark a proposal as just emitted for svc_key."""
self._last[svc_key] = time.time()
def remaining_seconds(self, svc_key: str) -> float:
return max(0.0, self._cooldown - (time.time() - self._last.get(svc_key, 0.0)))
def reset(self, svc_key: str) -> None:
"""Force-reset cooldown (e.g. for testing or manual override)."""
self._last.pop(svc_key, None)
# ---------------------------------------------------------------------------
# Event emission (filesystem — no control-plane import)
# ---------------------------------------------------------------------------
def _emit_event_sync(
event_type: str,
severity: str,
service: str,
correlation_id: str,
payload: Optional[dict] = None,
node: Optional[str] = None,
) -> None:
# Read NODE_NAME at call time (not import time) so monkeypatching works in tests.
if node is None:
node = NODE_NAME
"""Write a normalized JSON event file to the filesystem event store.
Mirrors scripts/lib/events.py behaviour keeping planner fully
independent of the control-plane package.
"""
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ")
date_dir = now.strftime("%Y-%m-%d")
svc_slug = (service or "planner").replace("/", "-").replace(" ", "-")
fname = f"evt-{node}-{int(time.time())}-{event_type}-{svc_slug}.json"
event_dir = EVENTS_DIR / date_dir / node
try:
event_dir.mkdir(parents=True, exist_ok=True)
(event_dir / fname).write_text(json.dumps({
"timestamp": timestamp,
"node": node,
"type": event_type,
"severity": severity,
"source": "planner-agent",
"service": service,
"correlation_id": correlation_id,
"payload": payload or {},
}, indent=2))
except Exception as exc:
log.warning("event_write_failed", path=str(event_dir / fname), error=str(exc))
async def emit_event(
event_type: str,
severity: str,
service: str,
correlation_id: str,
payload: Optional[dict] = None,
) -> None:
"""Async wrapper around _emit_event_sync (runs in thread pool)."""
await asyncio.to_thread(
_emit_event_sync, event_type, severity, service, correlation_id, payload
)
# ---------------------------------------------------------------------------
# Action file I/O
# ---------------------------------------------------------------------------
async def write_pending_action(proposal: ActionProposal) -> Path:
"""Atomically write proposal JSON to actions/pending/<action_id>.json."""
pending_dir = ACTIONS_DIR / "pending"
pending_dir.mkdir(parents=True, exist_ok=True)
path = pending_dir / f"{proposal.action_id}.json"
def _write() -> None:
# Write to tmp then rename so readers never see a partial file
tmp = path.with_suffix(".tmp")
tmp.write_text(json.dumps(proposal.to_action_file(), indent=2))
tmp.replace(path)
await asyncio.to_thread(_write)
return path
# ---------------------------------------------------------------------------
# LLM prompt helpers
# ---------------------------------------------------------------------------
def build_messages(event: HealthEvent) -> list[dict]:
"""Construct the OpenAI-style message list for one health event."""
user_content = (
f"Health event received:\n"
f" node: {event.node}\n"
f" service: {event.service}\n"
f" type: {event.event_type}\n"
f" severity: {event.severity}\n"
f" timestamp: {datetime.fromtimestamp(event.timestamp, tz=timezone.utc).isoformat()}\n"
)
if event.payload:
payload_str = json.dumps(event.payload, indent=4)
user_content += f" payload:\n{payload_str}\n"
user_content += (
"\nRespond with ONLY the JSON object as specified."
)
return [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
def map_action_to_executor_type(action: str) -> tuple[str, str]:
"""Map LLM action name → (executor type, risk_level)."""
return {
"restart": ("container_restart", "low"),
"redeploy": ("redeploy", "guarded"),
"notify": ("notify", "low"),
"ignore": ("ignore", "none"),
}.get(action, ("notify", "low"))
# ---------------------------------------------------------------------------
# Event parsing
# ---------------------------------------------------------------------------
def parse_event(raw: dict, channel: str) -> Optional[HealthEvent]:
"""Normalise a raw Redis pub/sub payload into a HealthEvent.
Accepts two common shapes:
Shape A node-agent / stability-agent filesystem event format:
{"type": "service_unhealthy", "node": "piha", "service": "mosquitto",
"severity": "error", "payload": {...}}
Shape B control-plane world_updates format:
{"event_type": "...", "node": "...", "service": "...", ...}
"""
event_type = raw.get("type") or raw.get("event_type", "")
node = (raw.get("node") or "").strip()
service = (raw.get("service") or "").strip()
severity = (raw.get("severity") or "info").strip()
if not event_type or not node:
return None
# For node-level events (e.g. node_offline) without a service field
if not service:
details = raw.get("details") or raw.get("payload") or {}
service = details.get("service", "") if isinstance(details, dict) else ""
if not service:
service = node # fallback: use node name as service key
# Parse timestamp
ts_raw = raw.get("timestamp", time.time())
if isinstance(ts_raw, str):
try:
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
ts = time.time()
else:
try:
ts = float(ts_raw)
except (TypeError, ValueError):
ts = time.time()
payload = raw.get("payload") or raw.get("details") or {}
if not isinstance(payload, dict):
payload = {}
return HealthEvent(
node = node,
service = service,
event_type = event_type,
severity = severity,
payload = payload,
timestamp = ts,
raw_channel = channel,
)
# ---------------------------------------------------------------------------
# Planner agent
# ---------------------------------------------------------------------------
# Event types that require no action (healthy signals, completions)
_BENIGN_EVENTS = frozenset({
"service_healthy",
"service_recovered",
"node_online",
"deployment_completed",
"deployment_started",
"remediation_started",
"remediation_completed",
})
class PlannerAgent:
"""Async agent: subscribe → receive → diagnose → propose action.
Designed for testability: all I/O (Redis, filesystem, LLM) is
injected or mockable. The ``router`` parameter accepts a pre-built
LLMRouter so tests can substitute it without network calls.
"""
def __init__(
self,
redis_url: str = REDIS_URL,
ollama_api_base: str = OLLAMA_API_BASE,
ollama_model: str = OLLAMA_MODEL,
router: Optional[LLMRouter] = None,
cooldown: Optional[CooldownTracker] = None,
feat(planner-agent): main loop with LLM routing and HITL action proposals services/planner-agent/src/planner.py: - PlannerAgent: async Redis pub/sub on health_events + world_updates - Pipeline: receive event → cooldown gate → LLMRouter → write pending action → emit remediation_started filesystem event - CooldownTracker: 5-min suppression per svc_key (configurable via env) - parse_event(): accepts node-agent shape A and world_updates shape B - PROPOSAL_SCHEMA: jsonschema enforced by LLMRouter before accepting response - SYSTEM_PROMPT: homelab topology + action rules (chelsty always requires_human, disk_pressure always notify, confidence<0.7 → requires_human) - write_pending_action(): atomic tmp→rename write, executor-compatible format - emit_event(): async wrapper around filesystem event write (no control-plane import) - _emit_event_sync() reads NODE_NAME at call time (not import) for testability - Benign events (service_healthy, node_online, ...) silently skipped - LLM chain failure: no cooldown recorded so next event can retry services/planner-agent/tests/test_planner.py (49 tests, 0 network): - TestCooldownTracker: 7 tests (ready/not-ready/elapsed/reset/independence) - TestHealthEvent, TestActionProposal, TestMapActionToExecutorType - TestParseEvent: both event shapes, missing fields, timestamp formats - TestBuildMessages: system prompt rules, payload inclusion - TestPlannerHandleEvent: benign skip, cooldown block, ignore/restart/redeploy/ notify proposals, remediation event emission, LLM failure isolation, requires_human propagation, cooldown recording, model name in proposal - TestPlannerDispatch: valid JSON, invalid JSON, non-string data, missing node - TestWritePendingAction, TestEmitEvent: filesystem integration with tmp_path services/planner-agent/service.yaml: owner_node: solaria, dependencies: [redis, ollama] services/planner-agent/docker-compose.yml: env + healthcheck services/planner-agent/Dockerfile: python:3.11-slim services/planner-agent/healthcheck.sh: heartbeat file age check (300s) services/planner-agent/requirements.txt: litellm, redis, jsonschema, structlog Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 19:11:39 +02:00
) -> None:
self._redis_url = redis_url
self._redis: Optional[aioredis.Redis] = None
self._pubsub: Optional[aioredis.client.PubSub] = None
self._running = False
self.router = router or LLMRouter(
redis_url = redis_url,
ollama_api_base = ollama_api_base,
ollama_model = ollama_model,
feat(planner-agent): main loop with LLM routing and HITL action proposals services/planner-agent/src/planner.py: - PlannerAgent: async Redis pub/sub on health_events + world_updates - Pipeline: receive event → cooldown gate → LLMRouter → write pending action → emit remediation_started filesystem event - CooldownTracker: 5-min suppression per svc_key (configurable via env) - parse_event(): accepts node-agent shape A and world_updates shape B - PROPOSAL_SCHEMA: jsonschema enforced by LLMRouter before accepting response - SYSTEM_PROMPT: homelab topology + action rules (chelsty always requires_human, disk_pressure always notify, confidence<0.7 → requires_human) - write_pending_action(): atomic tmp→rename write, executor-compatible format - emit_event(): async wrapper around filesystem event write (no control-plane import) - _emit_event_sync() reads NODE_NAME at call time (not import) for testability - Benign events (service_healthy, node_online, ...) silently skipped - LLM chain failure: no cooldown recorded so next event can retry services/planner-agent/tests/test_planner.py (49 tests, 0 network): - TestCooldownTracker: 7 tests (ready/not-ready/elapsed/reset/independence) - TestHealthEvent, TestActionProposal, TestMapActionToExecutorType - TestParseEvent: both event shapes, missing fields, timestamp formats - TestBuildMessages: system prompt rules, payload inclusion - TestPlannerHandleEvent: benign skip, cooldown block, ignore/restart/redeploy/ notify proposals, remediation event emission, LLM failure isolation, requires_human propagation, cooldown recording, model name in proposal - TestPlannerDispatch: valid JSON, invalid JSON, non-string data, missing node - TestWritePendingAction, TestEmitEvent: filesystem integration with tmp_path services/planner-agent/service.yaml: owner_node: solaria, dependencies: [redis, ollama] services/planner-agent/docker-compose.yml: env + healthcheck services/planner-agent/Dockerfile: python:3.11-slim services/planner-agent/healthcheck.sh: heartbeat file age check (300s) services/planner-agent/requirements.txt: litellm, redis, jsonschema, structlog Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 19:11:39 +02:00
)
self.cooldown = cooldown or CooldownTracker()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> None:
self._redis = aioredis.from_url(
self._redis_url,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=10,
)
self._pubsub = self._redis.pubsub(ignore_subscribe_messages=True)
await self._pubsub.subscribe(*SUBSCRIBE_CHANNELS)
log.info("planner_started", channels=SUBSCRIBE_CHANNELS, node=NODE_NAME)
async def stop(self) -> None:
self._running = False
if self._pubsub:
try:
await self._pubsub.unsubscribe()
await self._pubsub.aclose()
except Exception:
pass
if self._redis:
try:
await self._redis.aclose()
except Exception:
pass
try:
await self.router.close()
except Exception:
pass
log.info("planner_stopped")
async def run(self) -> None:
"""Main event loop. Runs until cancelled or SIGINT/SIGTERM."""
await self.start()
self._running = True
_ensure_dirs()
try:
while self._running:
self._touch_heartbeat()
try:
msg = await asyncio.wait_for(
self._pubsub.get_message(ignore_subscribe_messages=True),
timeout=5.0,
)
except asyncio.TimeoutError:
continue
if msg is None:
await asyncio.sleep(0.05)
continue
await self._dispatch(msg)
except asyncio.CancelledError:
log.info("planner_cancelled")
except Exception as exc:
log.exception("planner_fatal_error", error=str(exc))
raise
finally:
await self.stop()
# ------------------------------------------------------------------
# Message dispatch
# ------------------------------------------------------------------
async def _dispatch(self, msg: dict) -> None:
"""Deserialise one Redis pub/sub message and hand off to _handle_event."""
channel = msg.get("channel", "")
data = msg.get("data", "")
if not isinstance(data, str):
return
try:
raw = json.loads(data)
except json.JSONDecodeError:
log.warning("malformed_message", channel=channel, preview=data[:120])
return
if not isinstance(raw, dict):
return
event = parse_event(raw, channel)
if event is None:
log.debug("unparseable_event", channel=channel, keys=list(raw.keys()))
return
log.info(
"event_received",
channel = channel,
svc_key = event.svc_key,
type = event.event_type,
severity = event.severity,
)
await self._handle_event(event)
# ------------------------------------------------------------------
# Core pipeline
# ------------------------------------------------------------------
async def _handle_event(self, event: HealthEvent) -> None:
"""Cooldown → LLM proposal → write pending action → emit event."""
# Benign events need no remediation
if event.event_type in _BENIGN_EVENTS:
log.debug("benign_event_skipped", type=event.event_type, svc_key=event.svc_key)
return
svc_key = event.svc_key
if not self.cooldown.is_ready(svc_key):
log.info(
"cooldown_active",
svc_key = svc_key,
remaining_seconds = round(self.cooldown.remaining_seconds(svc_key)),
)
return
proposal = await self._propose_action(event)
if proposal is None:
# LLM fully failed — do not record cooldown so next event can retry
return
self.cooldown.record(svc_key)
if proposal.action == "ignore":
log.info(
"proposal_ignored",
svc_key = svc_key,
reason = proposal.reason,
confidence = proposal.confidence,
llm_model = proposal.llm_model,
)
return
# Write to pending (human must approve before executor runs it)
try:
path = await write_pending_action(proposal)
except Exception as exc:
log.error("action_write_failed", svc_key=svc_key, error=str(exc))
return
log.info(
"action_proposed",
action_id = proposal.action_id,
action = proposal.action,
executor_type = proposal.type,
svc_key = svc_key,
requires_human = proposal.requires_human,
confidence = proposal.confidence,
risk_level = proposal.risk_level,
llm_model = proposal.llm_model,
path = str(path),
)
await emit_event(
event_type = "remediation_started",
severity = "info",
service = event.service,
correlation_id = proposal.action_id,
payload = {
"action": proposal.action,
"executor_type": proposal.type,
"node": event.node,
"action_id": proposal.action_id,
"requires_human": proposal.requires_human,
"confidence": proposal.confidence,
"llm_model": proposal.llm_model,
},
)
# ------------------------------------------------------------------
# LLM call
# ------------------------------------------------------------------
async def _propose_action(self, event: HealthEvent) -> Optional[ActionProposal]:
"""Invoke LLMRouter and map the validated response to an ActionProposal."""
messages = build_messages(event)
action_id = (
f"plan-{event.node}-{event.service.replace('/', '-')}"
f"-{int(event.timestamp)}"
)
try:
result: RouteResult = await self.router.route(
messages = messages,
schema = PROPOSAL_SCHEMA,
context = f"planner.{event.svc_key}",
)
except RuntimeError as exc:
log.error(
"llm_chain_exhausted",
svc_key = event.svc_key,
error = str(exc)[:400],
)
return None
raw = result.content # already parsed + schema-validated by LLMRouter
action = raw["action"]
ex_type, risk = map_action_to_executor_type(action)
return ActionProposal(
action_id = action_id,
type = ex_type,
action = action,
service = raw.get("service") or event.service,
node = raw.get("node") or event.node,
reason = raw["reason"],
confidence = float(raw["confidence"]),
requires_human = bool(raw["requires_human"]),
risk_level = risk,
timestamp = event.timestamp,
source_event = event.event_type,
description = (
f"{action.upper()} {raw.get('service', event.service)} "
f"on {raw.get('node', event.node)}: {raw['reason']}"
),
llm_model = result.model_used,
llm_attempts = len(result.attempts),
)
# ------------------------------------------------------------------
# Utilities
# ------------------------------------------------------------------
def _touch_heartbeat(self) -> None:
try:
STATE_DIR.mkdir(parents=True, exist_ok=True)
HEARTBEAT.touch()
except Exception as exc:
log.warning("heartbeat_failed", error=str(exc))
# ---------------------------------------------------------------------------
# Module helpers
# ---------------------------------------------------------------------------
def _ensure_dirs() -> None:
for sub in ("pending", "approved", "running", "completed",
"failed", "rejected", "cancelled"):
(ACTIONS_DIR / sub).mkdir(parents=True, exist_ok=True)
STATE_DIR.mkdir(parents=True, exist_ok=True)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
async def _main() -> None:
agent = PlannerAgent()
loop = asyncio.get_running_loop()
def _shutdown(sig_name: str) -> None:
log.info("shutdown_signal", signal=sig_name)
agent._running = False
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _shutdown, sig.name)
await agent.run()
if __name__ == "__main__":
asyncio.run(_main())