--
-+ Dashboard
--
-- Dashboard
-
-
-
-@@ -287,10 +269,6 @@
- System Overview
-
-
--
--
- Pending Actions
--
--
-
-
-
--
-- Active Incidents
-
-@@ -298,20 +276,6 @@
-
--
--
-
-
--
--
--
-- Pending Approval
-- --
--
-- Active / History
-- --
-
-@@ -327,24 +291,11 @@
-
-
-
--
--
--
--
-
-
--
-- Runtime Topology
--
--
-
-
-
--
--
--
--
--
-
-
-
-@@ -384,34 +335,6 @@
- }
- }
-
-- async function postData(endpoint, data) {
-- try {
-- const res = await fetch(endpoint, {
-- method: 'POST',
-- headers: {'Content-Type': 'application/json'},
-- body: JSON.stringify(data)
-- });
-- return await res.json();
-- } catch (e) {
-- console.error('Post error:', endpoint, e);
-- return null;
-- }
-- }
--
-- async function mutateAction(id, status) {
-- const res = await postData('/action/mutate', {id, status});
-- if (res && res.status === 'ok') {
-- refreshData();
-- } else {
-- alert('Mutation failed');
-- }
-- }
--
-- function setOperatorMode(mode) {
-- console.log('Operator mode set to:', mode);
-- // In real system, this would call backend
-- }
--
- function formatTime(ts) {
- if (!ts) return 'N/A';
- return new Date(ts * 1000).toLocaleString();
-@@ -445,53 +368,6 @@
- }
- }
-
-- if (currentView === 'dashboard' || currentView === 'actions') {
-- const actions = await fetchData('/actions');
-- if (actions) {
-- if (currentView === 'dashboard') {
-- const dashActions = document.getElementById('dashboard-actions-summary');
-- const pendingCount = actions.pending.length;
-- dashActions.innerHTML = `
-- "$INVENTORY_DIR/node1/services.yaml"
-services:
- - name: homeassistant
-EOF
-
-cat < "$WORLD_DIR/services/homeassistant.json"
-{"name": "homeassistant", "status": "healthy", "node": "node1"}
-EOF
-
-cat < "$WORLD_DIR/nodes/node1.json"
-{"name": "node1", "status": "online"}
-EOF
-
-# --- Scenario 2: Unhealthy Service ---
-cat < "$WORLD_DIR/services/homeassistant.json"
-{"name": "homeassistant", "status": "unhealthy", "node": "node1"}
-EOF
-
-# --- Scenario 3: Missing Service ---
-cat < "$INVENTORY_DIR/node2/services.yaml"
-services:
- - name: webapp
-EOF
-# webapp is missing from world/services
-
-# --- Scenario 4: Dependency Unavailable (for Missing Service) ---
-cat < "$WORLD_DIR/services/database.json"
-{"name": "database", "status": "error", "node": "node2"}
-EOF
-# webapp depends on database in supervisor logic
-
-# --- Scenario 5: Offline Node ---
-cat < "$WORLD_DIR/nodes/node2.json"
-{"name": "node2", "status": "offline"}
-EOF
-
-# --- Scenario 6: Repeated Deployment Failures ---
-cat < "$WORLD_DIR/deployments/dep-001.json"
-{"id": "dep-001", "service": "webapp", "status": "failed", "timestamp": $(date +%s)}
-EOF
-cat < "$WORLD_DIR/deployments/dep-002.json"
-{"id": "dep-002", "service": "webapp", "status": "failed", "timestamp": $(( $(date +%s) - 300 ))}
-EOF
-cat < "$WORLD_DIR/deployments/dep-003.json"
-{"id": "dep-003", "service": "webapp", "status": "failed", "timestamp": $(( $(date +%s) - 600 ))}
-EOF
-
-# --- Scenario 7: Unresolved Incident ---
-cat < "$WORLD_DIR/incidents/inc-99.json"
-{"id": "inc-99", "description": "High memory usage on node1", "status": "investigating"}
-EOF
-
-echo "Scenarios generated successfully."
-echo "You can now run: HOMELAB_WORLD_ROOT=$WORLD_DIR python3 scripts/supervisor/supervisor.py"
diff --git a/webui/index.html b/webui/index.html
index 5c049c1..d20843a 100644
--- a/webui/index.html
+++ b/webui/index.html
@@ -263,6 +263,9 @@
+
`).join('') || 'No history.';
}
diff --git a/webui/web.py b/webui/web.py
index 4727274..4332474 100644
--- a/webui/web.py
+++ b/webui/web.py
@@ -1,19 +1,20 @@
import json
import os
-import socket
+import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
-STATE_DIR = Path("/opt/homelab/state")
-EVENTS_DIR = Path("/opt/homelab/events")
-WORLD_DIR = Path("/opt/homelab/world")
-ACTIONS_DIR = Path("/opt/homelab/actions")
-EVENT_LOG = Path("/tmp/agent-events.log")
+STATE_DIR = Path(os.getenv("HOMELAB_STATE_ROOT", "/opt/homelab/state"))
+EVENTS_DIR = Path(os.getenv("HOMELAB_EVENTS_ROOT", "/opt/homelab/events"))
+WORLD_DIR = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world"))
+ACTIONS_DIR = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
+CONFIG_DIR = Path(os.getenv("HOMELAB_CONFIG_ROOT", "/opt/homelab/config"))
+
STATIC_DIR = Path(__file__).parent
-REDIS_HOST = os.getenv("REDIS_HOST", "redis")
-REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
+
DEFAULT_CONFIG = {
+ "operator_mode": "approval",
"auto_mode": True,
"action_thresholds": {
"restart_ha": 0.8,
@@ -24,87 +25,6 @@ DEFAULT_CONFIG = {
}
-def tail_lines(path, limit):
- if not path.exists():
- path.touch()
-
- with path.open("r", encoding="utf-8", errors="replace") as handle:
- lines = handle.readlines()
- print(f"Read {len(lines)} lines", flush=True)
- return [line.rstrip("\n") for line in lines[-limit:]]
-
-
-def redis_command(*parts):
- payload = f"*{len(parts)}\r\n".encode("utf-8")
- for part in parts:
- data = str(part).encode("utf-8")
- payload += f"${len(data)}\r\n".encode("utf-8") + data + b"\r\n"
-
- with socket.create_connection((REDIS_HOST, REDIS_PORT), timeout=3) as client:
- client.sendall(payload)
- return client.recv(4096)
-
-
-def send_command(command):
- task = {
- "target": "orchestrator",
- "action": "input",
- "params": {
- "command": command,
- },
- }
- redis_command("LPUSH", "tasks", json.dumps(task))
-
-
-def send_stop_run(run_id):
- task = {
- "target": "orchestrator",
- "action": "stop_run",
- "params": {
- "run_id": run_id,
- },
- }
- redis_command("LPUSH", "tasks", json.dumps(task))
-
-
-def send_run_action(run_id, command):
- task = {
- "target": "orchestrator",
- "action": "run_action",
- "params": {
- "run_id": run_id,
- "command": command,
- },
- }
- redis_command("LPUSH", "tasks", json.dumps(task))
-
-
-def send_auto_mode(auto_mode):
- task = {
- "target": "orchestrator",
- "action": "set_auto_mode",
- "params": {
- "auto_mode": auto_mode,
- },
- }
- redis_command("LPUSH", "tasks", json.dumps(task))
-
-
-def send_auto_config(config):
- task = {
- "target": "orchestrator",
- "action": "set_auto_config",
- "params": {
- "config": config,
- },
- }
- redis_command("LPUSH", "tasks", json.dumps(task))
-
-
-def send_event(event):
- redis_command("LPUSH", "events", json.dumps(event))
-
-
def read_json_file(path, default=None):
if not path.exists():
return default if default is not None else []
@@ -114,21 +34,16 @@ def read_json_file(path, default=None):
return default if default is not None else []
-def current_config():
- config = json.loads(json.dumps(DEFAULT_CONFIG))
- # We still keep reading from EVENT_LOG for auto_config if it's there
- for line in tail_lines(EVENT_LOG, 500):
- try:
- event = json.loads(line)
- except json.JSONDecodeError:
- continue
- if event.get("type") != "auto_config":
- continue
+def get_config():
+ config_path = STATE_DIR / "operator-config.json"
+ if config_path.exists():
+ return read_json_file(config_path, DEFAULT_CONFIG)
+ return DEFAULT_CONFIG
- for key in config:
- if key in event:
- config[key] = event[key]
- return config
+
+def save_config(config):
+ STATE_DIR.mkdir(parents=True, exist_ok=True)
+ (STATE_DIR / "operator-config.json").write_text(json.dumps(config, indent=2))
def current_nodes():
@@ -152,7 +67,13 @@ def current_recommendations():
def current_summary():
- return read_json_file(STATE_DIR / "runtime-summary.json", default={})
+ summary = read_json_file(STATE_DIR / "runtime-summary.json", default={})
+ if summary:
+ # Check for staleness
+ mtime = os.path.getmtime(STATE_DIR / "runtime-summary.json")
+ summary["last_update"] = mtime
+ summary["stale"] = (time.time() - mtime) > 60 # Stale if older than 60s
+ return summary
def current_events():
@@ -161,6 +82,8 @@ def current_events():
for f in EVENTS_DIR.glob("*.json"):
data = read_json_file(f)
if data:
+ # Add source file for traceability
+ data["_source"] = f.name
events.append(data)
return sorted(events, key=lambda x: x.get("timestamp", 0), reverse=True)
@@ -175,6 +98,9 @@ def current_actions():
for f in status_dir.glob("*.json"):
data = read_json_file(f)
if data:
+ # Injects some metadata for UI
+ data["id"] = data.get("action_id") or f.stem
+ data["status"] = status
actions[status].append(data)
return actions
@@ -186,10 +112,12 @@ def mutate_action(action_id, target_status):
# Find where the action is
source_path = None
+ current_status = None
for status in statuses:
p = ACTIONS_DIR / status / f"{action_id}.json"
if p.exists():
source_path = p
+ current_status = status
break
if not source_path:
@@ -202,10 +130,17 @@ def mutate_action(action_id, target_status):
try:
data = json.loads(source_path.read_text())
data["status"] = target_status
- data["last_mutation"] = os.path.getmtime(source_path) # or current time
- import time
- data["last_mutation"] = time.time()
+ data["updated_at"] = time.time()
+ # Keep history of transitions
+ history = data.get("transition_history", [])
+ history.append({
+ "from": current_status,
+ "to": target_status,
+ "timestamp": time.time()
+ })
+ data["transition_history"] = history
+
target_path.write_text(json.dumps(data, indent=2))
if source_path != target_path:
source_path.unlink()
@@ -226,7 +161,7 @@ def send_json(status, payload, handler):
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/config":
- send_json(200, current_config(), self)
+ send_json(200, get_config(), self)
return
if self.path == "/nodes":
@@ -261,16 +196,6 @@ class Handler(BaseHTTPRequestHandler):
send_json(200, current_actions(), self)
return
- if self.path == "/logs":
- print("LOGS endpoint called", flush=True)
- body = ("\n".join(tail_lines(EVENT_LOG, 200)) + "\n").encode("utf-8")
- self.send_response(200)
- self.send_header("Content-Type", "text/plain; charset=utf-8")
- self.send_header("Content-Length", str(len(body)))
- self.end_headers()
- self.wfile.write(body)
- return
-
if self.path in ("/", "/index.html"):
body = (STATIC_DIR / "index.html").read_bytes()
self.send_response(200)
@@ -284,13 +209,9 @@ class Handler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path not in (
- "/command",
- "/stop",
- "/action",
- "/auto-mode",
"/config",
- "/events",
"/action/mutate",
+ "/mode",
):
self.send_error(404)
return
@@ -299,51 +220,26 @@ class Handler(BaseHTTPRequestHandler):
raw_body = self.rfile.read(length).decode("utf-8")
try:
payload = json.loads(raw_body)
- command = str(payload.get("command", "")).strip()
except json.JSONDecodeError:
- payload = {}
- command = raw_body.strip()
-
- if self.path == "/events":
- if not isinstance(payload, dict):
- self.send_error(400, "event object is required")
- return
- send_event(payload)
- send_json(200, {"status": "sent"}, self)
- return
-
- if self.path == "/stop":
- run_id = str(payload.get("run_id", "")).strip()
- if not run_id:
- self.send_error(400, "run_id is required")
- return
-
- send_stop_run(run_id)
- send_json(200, {"status": "sent"}, self)
- return
-
- if self.path == "/action":
- run_id = str(payload.get("run_id", "")).strip()
- action_command = str(payload.get("command", "")).strip()
- if not run_id:
- self.send_error(400, "run_id is required")
- return
- if not action_command:
- self.send_error(400, "command is required")
- return
-
- send_run_action(run_id, action_command)
- send_json(200, {"status": "sent"}, self)
- return
-
- if self.path == "/auto-mode":
- send_auto_mode(bool(payload.get("auto_mode")))
- send_json(200, {"status": "sent"}, self)
+ self.send_error(400, "Invalid JSON")
return
if self.path == "/config":
- send_auto_config(payload)
- send_json(200, {"status": "sent"}, self)
+ config = get_config()
+ config.update(payload)
+ save_config(config)
+ send_json(200, {"status": "ok"}, self)
+ return
+
+ if self.path == "/mode":
+ mode = payload.get("mode")
+ if not mode:
+ self.send_error(400, "mode is required")
+ return
+ config = get_config()
+ config["operator_mode"] = mode
+ save_config(config)
+ send_json(200, {"status": "ok"}, self)
return
if self.path == "/action/mutate":
@@ -359,17 +255,18 @@ class Handler(BaseHTTPRequestHandler):
self.send_error(500, msg)
return
- if not command:
- self.send_error(400, "command is required")
- return
-
- send_command(command)
- send_json(200, {"status": "sent"}, self)
-
def log_message(self, format, *args):
return
if __name__ == "__main__":
- server = ThreadingHTTPServer(("0.0.0.0", 8080), Handler)
+ # Ensure directories exist
+ for d in [STATE_DIR, EVENTS_DIR, WORLD_DIR, ACTIONS_DIR, CONFIG_DIR]:
+ d.mkdir(parents=True, exist_ok=True)
+ for s in ["pending", "approved", "running", "completed", "failed", "rejected"]:
+ (ACTIONS_DIR / s).mkdir(parents=True, exist_ok=True)
+
+ port = int(os.getenv("PORT", "8080"))
+ print(f"Operator Control Plane starting on 0.0.0.0:{port}")
+ server = ThreadingHTTPServer(("0.0.0.0", port), Handler)
server.serve_forever()
Pending
${pendingCount}
-- Running
${actions.running.length}
-- `;
-- }
-- if (currentView === 'actions') {
-- const pendingEl = document.getElementById('actions-pending');
-- const historyEl = document.getElementById('actions-history');
--
-- pendingEl.innerHTML = actions.pending.map(a => `
--
--
-- `).join('') || 'No pending actions.';
--
-- const history = [...actions.approved, ...actions.running, ...actions.completed, ...actions.failed];
-- historyEl.innerHTML = history.sort((a,b) => b.timestamp - a.timestamp).map(a => `
--
--
-- ${a.type.toUpperCase()}
-- ${a.risk_level}
-- ${a.description}
--Target
${a.target.node} ${a.target.service || ''}
-- Confidence
${Math.round(a.confidence*100)}%
--
--
--
--
--
--
-- `).join('') || 'No history.';
-- }
-- }
-- }
--
- if (currentView === 'dashboard' || currentView === 'events') {
- const incidents = await fetchData('/incidents');
- if (currentView === 'dashboard') {
-@@ -598,64 +474,6 @@
- `).join('');
- }
-
-- if (currentView === 'topology') {
-- const nodes = await fetchData('/nodes');
-- const services = await fetchData('/services');
-- const topMap = document.getElementById('topology-map');
-- if (nodes && services) {
-- topMap.innerHTML = nodes.map(node => {
-- const nodeServices = services.filter(s => s.node === node.hostname || s.node === node.id);
-- return `
--
-- ${a.type.toUpperCase()}
-- ${a.status}
--
-- ${a.description}
-- ${formatTime(a.timestamp)} | Target: ${a.target.node}
-- ${a.status === 'approved' ? `` : ''}
--
--
-- `;
-- }).join('');
-- }
-- }
--
-- if (currentView === 'correlation') {
-- const incidents = await fetchData('/incidents');
-- const actions = await fetchData('/actions');
-- const list = document.getElementById('correlation-chains');
-- if (incidents && actions) {
-- const allActions = Object.values(actions).flat();
-- list.innerHTML = incidents.map(inc => {
-- const related = allActions.filter(a => a.correlation_chain && a.correlation_chain.includes(inc.id));
-- return `
--
--
-- ${node.hostname}
-- ${node.health}
-- Capabilities
-- ${node.capabilities.join(', ')}
-- Services
--
-- ${nodeServices.length > 0 ? nodeServices.map(s => `
--
--
-- ${s.name}
-- ${s.health}
--
-- ${s.dependencies.length > 0 ? `dep: ${s.dependencies.join(', ')}
` : ''}
-- `).join('') : 'None
'}
--
--
-- `;
-- }).join('');
-- }
-- }
- if (currentView === 'settings') {
- const config = await fetchData('/config');
- const content = document.getElementById('settings-content');
-@@ -664,8 +482,6 @@
-
--
-- Incident: ${inc.id || 'INC-001'}
-- Active
-- ${inc.message}
--Related Actions
-- ${related.map(a => `
--
-- ${a.type} (${a.status})
-- ${a.description} --
-- `).join('') || '-- ${a.description} --
No actions yet
'}
-- ${config.auto_mode ? 'Enabled' : 'Disabled'}
- Action Thresholds
- ${JSON.stringify(config.action_thresholds, null, 2)}
-- Telegram Integration
-- Ready for mobile approval flows. Hook: /api/v1/telegram/webhook
-
- `;
- }
-diff --git a/webui/web.py b/webui/web.py
-index 4727274..053ac1a 100644
---- a/webui/web.py
-+++ b/webui/web.py
-@@ -8,7 +8,6 @@ from pathlib import Path
- STATE_DIR = Path("/opt/homelab/state")
- EVENTS_DIR = Path("/opt/homelab/events")
- WORLD_DIR = Path("/opt/homelab/world")
--ACTIONS_DIR = Path("/opt/homelab/actions")
- EVENT_LOG = Path("/tmp/agent-events.log")
- STATIC_DIR = Path(__file__).parent
- REDIS_HOST = os.getenv("REDIS_HOST", "redis")
-@@ -165,55 +164,6 @@ def current_events():
- return sorted(events, key=lambda x: x.get("timestamp", 0), reverse=True)
-
-
--def current_actions():
-- actions = {}
-- statuses = ["pending", "approved", "running", "completed", "failed", "rejected"]
-- for status in statuses:
-- actions[status] = []
-- status_dir = ACTIONS_DIR / status
-- if status_dir.exists():
-- for f in status_dir.glob("*.json"):
-- data = read_json_file(f)
-- if data:
-- actions[status].append(data)
-- return actions
--
--
--def mutate_action(action_id, target_status):
-- statuses = ["pending", "approved", "running", "completed", "failed", "rejected"]
-- if target_status not in statuses:
-- return False, f"Invalid target status: {target_status}"
--
-- # Find where the action is
-- source_path = None
-- for status in statuses:
-- p = ACTIONS_DIR / status / f"{action_id}.json"
-- if p.exists():
-- source_path = p
-- break
--
-- if not source_path:
-- return False, f"Action {action_id} not found"
--
-- target_dir = ACTIONS_DIR / target_status
-- target_dir.mkdir(parents=True, exist_ok=True)
-- target_path = target_dir / f"{action_id}.json"
--
-- try:
-- data = json.loads(source_path.read_text())
-- data["status"] = target_status
-- data["last_mutation"] = os.path.getmtime(source_path) # or current time
-- import time
-- data["last_mutation"] = time.time()
--
-- target_path.write_text(json.dumps(data, indent=2))
-- if source_path != target_path:
-- source_path.unlink()
-- return True, "Success"
-- except Exception as e:
-- return False, str(e)
--
--
- def send_json(status, payload, handler):
- body = (json.dumps(payload) + "\n").encode("utf-8")
- handler.send_response(status)
-@@ -257,10 +207,6 @@ class Handler(BaseHTTPRequestHandler):
- send_json(200, current_events(), self)
- return
-
-- if self.path == "/actions":
-- send_json(200, current_actions(), self)
-- return
--
- if self.path == "/logs":
- print("LOGS endpoint called", flush=True)
- body = ("\n".join(tail_lines(EVENT_LOG, 200)) + "\n").encode("utf-8")
-@@ -290,7 +236,6 @@ class Handler(BaseHTTPRequestHandler):
- "/auto-mode",
- "/config",
- "/events",
-- "/action/mutate",
- ):
- self.send_error(404)
- return
-@@ -346,19 +291,6 @@ class Handler(BaseHTTPRequestHandler):
- send_json(200, {"status": "sent"}, self)
- return
-
-- if self.path == "/action/mutate":
-- action_id = payload.get("id")
-- target = payload.get("status")
-- if not action_id or not target:
-- self.send_error(400, "id and status are required")
-- return
-- success, msg = mutate_action(action_id, target)
-- if success:
-- send_json(200, {"status": "ok"}, self)
-- else:
-- self.send_error(500, msg)
-- return
--
- if not command:
- self.send_error(400, "command is required")
- return
\ No newline at end of file
diff --git a/compose/monitor-agent.yml b/compose/monitor-agent.yml
deleted file mode 100644
index c448fd4..0000000
--- a/compose/monitor-agent.yml
+++ /dev/null
@@ -1,9 +0,0 @@
-services:
- monitor-agent:
- build: ./monitor-agent
- environment:
- NODE_NAME: ${NODE_NAME:-}
- ORCHESTRATOR_URL: ${ORCHESTRATOR_URL:?set ORCHESTRATOR_URL}
- SERVICES_TO_CHECK: ${SERVICES_TO_CHECK:-homeassistant,lms,forgejo}
- INTERVAL_SECONDS: ${INTERVAL_SECONDS:-30}
- restart: unless-stopped
diff --git a/docker-compose.yml b/docker-compose.yml
index 882f287..6ac78dd 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,41 +1,9 @@
services:
- redis:
- image: redis:7
- ports:
- - "6379:6379"
- restart: unless-stopped
-
- orchestrator:
- build: ./orchestrator
- depends_on:
- - redis
- environment:
- REDIS_HOST: redis
- HA_PROXY_URL: https://ha.okit.pl
- volumes:
- - /tmp/agent-events.log:/tmp/agent-events.log
- stdin_open: true
- tty: true
-
webui:
build: ./webui
container_name: agent-system-webui
- environment:
- REDIS_HOST: redis
ports:
- "8080:8080"
volumes:
- - /tmp/agent-events.log:/tmp/agent-events.log
- depends_on:
- - orchestrator
- - redis
-
- monitor-agent:
- build: ./monitor-agent
- environment:
- NODE_NAME: ubuntu-4gb-hel1-1
- ORCHESTRATOR_URL: http://webui:8080/events
- SERVICES_TO_CHECK: homeassistant,lms,forgejo
- depends_on:
- - webui
+ - /opt/homelab:/opt/homelab
restart: unless-stopped
diff --git a/docs/operator/approval-workflow.md b/docs/operator/approval-workflow.md
index 9312ae1..da31035 100644
--- a/docs/operator/approval-workflow.md
+++ b/docs/operator/approval-workflow.md
@@ -1,27 +1,27 @@
# Operator Approval Workflow
-This document describes the process of reviewing and approving actions generated by the reconciliation supervisor.
+This document describes the process of reviewing and approving actions generated by the reconciliation supervisor. The Control Plane is entirely filesystem-first, meaning all state is derived from and written to specific directories.
## Workflow Stages
### 1. Action Identification
-When the supervisor identifies a delta between desired and actual state, it generates a pending action in `/opt/homelab/actions/pending/`.
+When the supervisor (running in `homelab-codex-ws`) identifies a delta between desired and actual state, it generates a pending action JSON file in `/opt/homelab/actions/pending/`.
### 2. Risk Assessment
Actions are categorized by risk level:
- **Safe**: Low impact, high confidence. Can be auto-approved in autonomous mode.
- **Guarded**: Moderate impact. Requires explicit operator approval.
-- **Dangerous**: High impact (e.g., node redeploy). Requires multi-step approval or senior operator override.
+- **Dangerous**: High impact (e.g., node redeploy). Requires multi-step approval. These are highlighted in red in the UI.
### 3. Review Process
1. Navigate to the **Action Queue** view.
-2. Review the **Confidence Score** and **Correlation Chain** to understand why the action was proposed.
-3. Check the **Rollback Availability**.
+2. Review the **Confidence Score** and **Correlation Chain** (if available) to understand why the action was proposed.
+3. Check the **Trace** to see the lifecycle of the action.
### 4. Decision
-- **Approve**: Moves action to `approved` state.
-- **Reject**: Moves action to `rejected` state and suppresses similar recommendations for a cooldown period.
-- **Execute**: Transitions an approved action to `running` status.
+- **Approve**: Moves the action JSON file from `pending/` to `approved/`.
+- **Reject**: Moves the action JSON file from `pending/` to `rejected/`.
+- **Execute**: Moves an approved action from `approved/` to `running/`. The live executor in the runtime will then pick it up.
-## Mobile Approvals
-Approval requests can be acknowledged via the Telegram bot integration, allowing for remote operational control.
+## Filesystem Semantics
+The operator console performs "mutations" by moving files between subdirectories in `/opt/homelab/actions/`. This ensures a robust, local-first operational trail.
diff --git a/docs/operator/stale-state-semantics.md b/docs/operator/stale-state-semantics.md
new file mode 100644
index 0000000..6da3ed8
--- /dev/null
+++ b/docs/operator/stale-state-semantics.md
@@ -0,0 +1,25 @@
+# Stale State Semantics
+
+In a local-first, filesystem-backed control plane, visibility depends on the freshness of the runtime state files.
+
+## Detection
+The Operator Control Plane monitors the modification time (mtime) of the `runtime-summary.json` file in `/opt/homelab/state/`.
+
+- **Live**: If the file was updated within the last 60 seconds.
+- **Stale**: If the file is older than 60 seconds.
+
+## UI Representation
+When the state is detected as stale:
+1. A **Critical Warning Banner** appears at the top of the console.
+2. The exact time of the last successful update is displayed.
+3. Health badges and metrics should be treated with caution as they represent the last known good state, not necessarily the current live state.
+
+## Causes of Staleness
+- **Runtime Observer Failure**: The process responsible for writing state to the filesystem has crashed.
+- **Node Isolation**: The node where the observer is running is offline or disconnected.
+- **Filesystem Latency**: Issues with the underlying storage layer (e.g., SD card degradation).
+
+## Operator Response
+1. Check the **Nodes** view to identify if a specific observer node is offline.
+2. Investigate the `homelab-codex-ws` runtime logs.
+3. Manually verify critical services if the control plane remains stale for an extended period.
diff --git a/monitor-agent/Dockerfile b/monitor-agent/Dockerfile
deleted file mode 100644
index c44b974..0000000
--- a/monitor-agent/Dockerfile
+++ /dev/null
@@ -1,6 +0,0 @@
-FROM python:3.11-slim
-WORKDIR /app
-COPY requirements.txt .
-RUN pip install --no-cache-dir -r requirements.txt
-COPY main.py .
-CMD ["python", "main.py"]
diff --git a/monitor-agent/main.py b/monitor-agent/main.py
deleted file mode 100644
index 03bbaf0..0000000
--- a/monitor-agent/main.py
+++ /dev/null
@@ -1,117 +0,0 @@
-import json
-import os
-import socket
-import time
-from urllib.error import HTTPError, URLError
-from urllib.request import Request, urlopen
-
-
-NODE_NAME = os.getenv("NODE_NAME") or socket.gethostname()
-ORCHESTRATOR_URL = os.getenv("ORCHESTRATOR_URL")
-SERVICES_TO_CHECK = {
- name.strip()
- for name in os.getenv("SERVICES_TO_CHECK", "").split(",")
- if name.strip()
-}
-INTERVAL_SECONDS = int(os.getenv("INTERVAL_SECONDS", "30"))
-SERVICE_CATALOG = [
- {"name": "homeassistant", "type": "http", "url": "http://homeassistant:8123"},
- {"name": "lms", "type": "tcp", "host": "192.168.31.6", "port": 9000},
- {"name": "forgejo", "type": "http", "url": "http://forgejo:3000"},
- {"name": "nginx", "type": "http", "url": "http://nginx"},
- {"name": "mosquitto", "type": "tcp", "host": "mosquitto", "port": 1883},
-]
-
-
-def services_to_check():
- if not SERVICES_TO_CHECK:
- return SERVICE_CATALOG
- return [
- service for service in SERVICE_CATALOG
- if service["name"] in SERVICES_TO_CHECK
- ]
-
-
-def check_http(url):
- request = Request(url, headers={"User-Agent": "monitor-agent/1.0"})
- try:
- with urlopen(request, timeout=5) as response:
- return "ok" if response.status == 200 else "error"
- except (HTTPError, URLError, TimeoutError, OSError):
- return "error"
-
-
-def check_tcp(host, port):
- try:
- with socket.create_connection((host, int(port)), timeout=5):
- return "ok"
- except OSError:
- return "error"
-
-
-def check_service(service):
- service_type = service.get("type")
- if service_type == "http":
- return check_http(service["url"])
- if service_type == "tcp":
- return check_tcp(service["host"], service["port"])
- return "error"
-
-
-def build_event(service, status):
- return {
- "type": "health",
- "service": service["name"],
- "status": status,
- "timestamp": time.time(),
- "run_id": None,
- "node": NODE_NAME,
- }
-
-
-def send_event(event):
- body = json.dumps(event).encode("utf-8")
- request = Request(
- ORCHESTRATOR_URL,
- data=body,
- headers={"Content-Type": "application/json"},
- method="POST",
- )
- with urlopen(request, timeout=5) as response:
- if response.status >= 300:
- raise RuntimeError(f"event endpoint returned {response.status}")
-
-
-def main():
- if not ORCHESTRATOR_URL:
- raise SystemExit("ORCHESTRATOR_URL is required")
-
- selected_services = services_to_check()
- print(
- (
- f"[monitor-agent] ready node={NODE_NAME} "
- f"url={ORCHESTRATOR_URL} "
- f"services={[service['name'] for service in selected_services]}"
- ),
- flush=True,
- )
-
- while True:
- started = time.time()
- for service in selected_services:
- status = check_service(service)
- event = build_event(service, status)
- try:
- send_event(event)
- except Exception as exc:
- print(f"[monitor-agent] send failed: {exc}", flush=True)
- print(json.dumps(event), flush=True)
-
- elapsed = time.time() - started
- time.sleep(max(0, INTERVAL_SECONDS - elapsed))
-
-
-if __name__ == "__main__":
- main()
-
-print("MONITOR_AGENT_VERSION=1")
diff --git a/monitor-agent/requirements.txt b/monitor-agent/requirements.txt
deleted file mode 100644
index e69de29..0000000
diff --git a/node-agent/Dockerfile b/node-agent/Dockerfile
deleted file mode 100644
index f69bbf8..0000000
--- a/node-agent/Dockerfile
+++ /dev/null
@@ -1,14 +0,0 @@
-FROM python:3.11-slim
-
-WORKDIR /app
-
-RUN apt-get update \
- && apt-get install -y --no-install-recommends curl \
- && rm -rf /var/lib/apt/lists/*
-
-COPY requirements.txt .
-RUN pip install --no-cache-dir -r requirements.txt
-
-COPY . .
-
-CMD ["python", "main.py"]
diff --git a/node-agent/main.py b/node-agent/main.py
deleted file mode 100644
index 1034b41..0000000
--- a/node-agent/main.py
+++ /dev/null
@@ -1,116 +0,0 @@
-import json
-import os
-import subprocess
-import time
-
-import redis
-
-
-NODE_NAME = os.getenv("NODE_NAME", "node")
-REDIS_HOST = os.getenv("REDIS_HOST", "redis")
-
-
-def redis_client():
- return redis.Redis(host=REDIS_HOST, port=6379, decode_responses=True)
-
-
-def build_result(task_id, status, result="", error=None, run_id=None):
- return {
- "task_id": task_id,
- "run_id": run_id,
- "node": NODE_NAME,
- "status": status,
- "result": result,
- "error": error,
- }
-
-
-def run_command(cmd):
- completed = subprocess.run(
- cmd,
- shell=True,
- text=True,
- capture_output=True,
- timeout=30,
- )
- output = completed.stdout.strip()
- error = completed.stderr.strip()
-
- if completed.returncode != 0:
- return "error", output, error or f"exit code {completed.returncode}"
-
- return "ok", output, None
-
-
-def docker_ps(params):
- container = params.get("container")
- cmd = "docker ps --format '{{.Names}}|{{.Status}}'"
- status, output, error = run_command(cmd)
- if status != "ok":
- return status, output, error
-
- containers = []
- for line in output.splitlines():
- name, _, state = line.partition("|")
- if container and container not in name:
- continue
- containers.append({"name": name, "status": state})
-
- return "ok", containers, None
-
-
-def docker_logs(params):
- container = params.get("container")
- if not container:
- return "error", "", "container parameter is required"
-
- return run_command(f"docker logs --tail 120 {container}")
-
-
-def handle_task(task):
- action = task.get("action")
- params = task.get("params") or {}
-
- if action == "exec":
- return run_command(params.get("cmd", ""))
- if action == "docker_ps":
- return docker_ps(params)
- if action == "docker_logs":
- return docker_logs(params)
-
- return "error", "", f"unknown action: {action}"
-
-
-def main():
- client = redis_client()
- print(f"[node-agent] ready node={NODE_NAME} redis={REDIS_HOST}")
-
- while True:
- try:
- _, raw_task = client.brpop("tasks")
- task = json.loads(raw_task)
- if task.get("target") != NODE_NAME:
- client.lpush("tasks", raw_task)
- time.sleep(1)
- continue
-
- status, result, error = handle_task(task)
- client.lpush(
- "results",
- json.dumps(
- build_result(
- task.get("task_id"),
- status,
- result,
- error,
- task.get("run_id"),
- )
- ),
- )
- except Exception as exc:
- print(f"[node-agent] error: {exc}")
- time.sleep(2)
-
-
-if __name__ == "__main__":
- main()
diff --git a/node-agent/requirements.txt b/node-agent/requirements.txt
deleted file mode 100644
index 7800f0f..0000000
--- a/node-agent/requirements.txt
+++ /dev/null
@@ -1 +0,0 @@
-redis
diff --git a/orchestrator/Dockerfile b/orchestrator/Dockerfile
deleted file mode 100644
index 7b3f32c..0000000
--- a/orchestrator/Dockerfile
+++ /dev/null
@@ -1,10 +0,0 @@
-FROM python:3.11-slim
-
-WORKDIR /app
-
-COPY requirements.txt .
-RUN pip install --no-cache-dir -r requirements.txt
-
-COPY . .
-
-CMD ["python", "main.py"]
diff --git a/orchestrator/diagnosis.py b/orchestrator/diagnosis.py
deleted file mode 100644
index 53091e9..0000000
--- a/orchestrator/diagnosis.py
+++ /dev/null
@@ -1,80 +0,0 @@
-class DiagnosisEngine:
- def __init__(self):
- self.signals = {}
-
- def add(self, key, value):
- self.signals[key] = value
-
- def evaluate(self):
- results = []
-
- lms = self.signals.get("lms")
- ha_logs_client_error = self.signals.get("ha_logs_client_connector_error")
- ha_container = self.signals.get("ha_container")
- ha_dependency_failure = (
- lms == "000"
- and ha_logs_client_error
- and ha_container == "running"
- )
-
- # LMS
- if lms == "000" and not ha_dependency_failure:
- results.append("[info] LMS is not reachable")
- elif lms == "200":
- results.append("[info] LMS: OK")
-
- # Dependency failures
- if ha_dependency_failure:
- results.append("[warning] HA dependency failure: LMS unreachable")
-
- # HA local
- ha_local = self.signals.get("ha_local")
- if ha_local == "000":
- options = [
- {
- "label": "Restart HA",
- "command": "restart_ha",
- "confidence": 0.85,
- },
- {
- "label": "Check network",
- "command": "check_network",
- "confidence": 0.6,
- }
- ]
- ignore_option = {
- "label": "Ignore",
- "command": "ignore",
- "is_ignore": True,
- }
- results.append("[error] HA is not working locally")
- results.append(
- {
- "type": "proposal",
- "message": "Home Assistant is not working",
- "confidence": 0.85,
- "options": sorted(
- options,
- key=lambda option: option["confidence"],
- reverse=True,
- ) + [ignore_option],
- }
- )
- elif ha_local == "200":
- results.append("[info] HA local: OK")
-
- # HA proxy
- ha_proxy = self.signals.get("ha_proxy")
- if ha_proxy == "dns_error":
- results.append("[error] HA proxy: DNS failure")
- results.append("[error] external access issue: DNS or routing failure")
- elif ha_proxy == "000":
- results.append("[error] HA proxy: not reachable")
- elif ha_proxy == "200":
- results.append("[info] HA proxy: OK")
-
- diagnosis_count = sum(1 for result in results if isinstance(result, str))
- if diagnosis_count > 1:
- results.append("[info] multiple issues detected")
-
- return results
diff --git a/orchestrator/events.py b/orchestrator/events.py
deleted file mode 100644
index 604ba5e..0000000
--- a/orchestrator/events.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import json
-import time
-
-EVENT_LOG = "/tmp/agent-events.log"
-
-
-def emit_event(event):
- event.setdefault("run_id", None)
- event.setdefault("node", None)
- event.setdefault("timestamp", time.time())
- line = json.dumps(event)
- print(line)
- print("EVENT WRITTEN")
- try:
- with open(EVENT_LOG, "a", encoding="utf-8") as event_log:
- event_log.write(line + "\n")
- event_log.flush()
- except OSError as exc:
- print(f"[event:error] failed to write event log: {exc}")
-
-
-def emit_run_progress(run_id, run):
- emit_event(
- {
- "type": "run_progress",
- "run_id": run_id,
- "node": None,
- "received": run.get("received", 0),
- "expected": run.get("expected", 0),
- "message": (
- f"run progress: {run.get('received', 0)}/"
- f"{run.get('expected', 0)}"
- ),
- }
- )
diff --git a/orchestrator/main.py b/orchestrator/main.py
deleted file mode 100644
index 9be20b2..0000000
--- a/orchestrator/main.py
+++ /dev/null
@@ -1,408 +0,0 @@
-import json
-import os
-import sys
-import threading
-import time
-from uuid import uuid4
-
-from diagnosis import DiagnosisEngine
-from events import emit_event, emit_run_progress
-from redis_client import get_redis_client
-from result_listener import listen_for_results
-from task_builder import build_task
-
-
-def send_task(redis_client, task, runs=None, run_id=None):
- if run_id:
- task["run_id"] = run_id
- if runs is not None and run_id in runs:
- runs[run_id]["expected"] += 1
- emit_run_progress(run_id, runs[run_id])
-
- redis_client.lpush("tasks", json.dumps(task))
- print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
- emit_event(
- {
- "type": "task",
- "message": f"sent {task['action']} to {task['target']}",
- "run_id": task.get("run_id"),
- "node": task.get("target"),
- }
- )
-
-
-def stop_run(runs, run_id):
- run = runs.get(run_id)
- if not run or not run.get("active"):
- return False
-
- run["active"] = False
- run["status"] = "stopped"
- emit_event(
- {
- "type": "run_status",
- "message": "run status: stopped",
- "run_id": run_id,
- "node": None,
- "status": "stopped",
- }
- )
- runs.pop(run_id, None)
- return True
-
-
-def apply_run_action(redis_client, run_actions, run_id, command):
- context = run_actions.get(run_id, {})
- if command == "ignore":
- emit_event(
- {
- "type": "action",
- "message": "proposal ignored",
- "run_id": run_id,
- "node": None,
- }
- )
- return True
-
- if command == "check_network":
- emit_event(
- {
- "type": "action",
- "message": "confirmed action: check network connectivity",
- "run_id": run_id,
- "node": "vps",
- }
- )
- send_task(
- redis_client,
- build_task("vps", "exec", {"cmd": "ping -c 1 1.1.1.1"}),
- run_id=run_id,
- )
- return True
-
- if command == "restart_ha":
- container_name = context.get("ha_container_name") or "homeassistant"
- emit_event(
- {
- "type": "action",
- "message": f"confirmed action: restart {container_name}",
- "run_id": run_id,
- "node": "piha",
- }
- )
- send_task(
- redis_client,
- build_task("piha", "exec", {"cmd": f"docker restart {container_name}"}),
- run_id=run_id,
- )
- return True
-
- emit_event(
- {
- "type": "action",
- "message": f"unknown proposal command: {command}",
- "run_id": run_id,
- "node": None,
- }
- )
- return False
-
-
-def emit_auto_config(auto_config, message=None):
- emit_event(
- {
- "type": "auto_config",
- "message": message or "auto config updated",
- "run_id": None,
- "node": None,
- "auto_mode": auto_config.get("auto_mode"),
- "action_thresholds": auto_config.get("action_thresholds", {}),
- "default_threshold": auto_config.get("default_threshold"),
- "allowed_auto_actions": auto_config.get("allowed_auto_actions", []),
- "max_retries_per_action": auto_config.get("max_retries_per_action", {}),
- "retry_window_seconds": auto_config.get("retry_window_seconds"),
- }
- )
-
-
-def set_auto_config(auto_config, config):
- if "auto_mode" in config:
- auto_config["auto_mode"] = bool(config["auto_mode"])
- if isinstance(config.get("action_thresholds"), dict):
- auto_config["action_thresholds"] = {
- str(command): float(threshold)
- for command, threshold in config["action_thresholds"].items()
- }
- if "default_threshold" in config:
- auto_config["default_threshold"] = float(config["default_threshold"])
- if isinstance(config.get("allowed_auto_actions"), list):
- auto_config["allowed_auto_actions"] = [
- str(command)
- for command in config["allowed_auto_actions"]
- ]
-
- emit_auto_config(auto_config)
-
-
-def set_auto_mode(auto_config, enabled):
- set_auto_config(auto_config, {"auto_mode": enabled})
-
-
-def dispatch(redis_client, command, ha_task_ids, runs):
- if command == "ha":
- run_id = str(uuid4())
- runs[run_id] = {
- "expected": 0,
- "received": 0,
- "engine": DiagnosisEngine(),
- "active": True,
- "status": "running",
- }
- emit_event(
- {
- "type": "run_status",
- "message": "run status: running",
- "run_id": run_id,
- "node": None,
- "status": "running",
- }
- )
- emit_run_progress(run_id, runs[run_id])
-
- task = build_task("piha", "docker_ps", {})
- ha_task_ids[task["task_id"]] = run_id
- send_task(redis_client, task, runs, run_id)
- elif command == "stop":
- for run_id, run in list(runs.items()):
- if not run.get("active"):
- continue
-
- stop_run(runs, run_id)
- elif command == "all":
- send_task(redis_client, build_task("piha", "docker_ps", {}))
- send_task(redis_client, build_task("vps", "docker_ps", {}))
- else:
- send_task(redis_client, build_task("piha", "docker_ps", {}))
-
-
-def process_input(command, redis_client, ha_task_ids, runs):
- if not command:
- return
-
- print(f"[input] {command}")
- emit_event(
- {
- "type": "log",
- "message": f"user input: {command}",
- "run_id": None,
- "node": None,
- }
- )
- dispatch(redis_client, command, ha_task_ids, runs)
-
-
-def listen_for_input_tasks(redis_client, ha_task_ids, runs, run_actions, auto_config):
- while True:
- _, raw_task = redis_client.brpop("tasks")
- try:
- task = json.loads(raw_task)
- except json.JSONDecodeError:
- continue
-
- if task.get("target") != "orchestrator":
- redis_client.lpush("tasks", raw_task)
- time.sleep(1)
- continue
-
- action = task.get("action")
- if action == "stop_run":
- params = task.get("params") or {}
- stop_run(runs, str(params.get("run_id", "")).strip())
- continue
-
- if action == "run_action":
- params = task.get("params") or {}
- apply_run_action(
- redis_client,
- run_actions,
- str(params.get("run_id", "")).strip(),
- str(params.get("command", "")).strip(),
- )
- continue
-
- if action == "set_auto_mode":
- params = task.get("params") or {}
- set_auto_mode(auto_config, bool(params.get("auto_mode")))
- continue
-
- if action == "set_auto_config":
- params = task.get("params") or {}
- set_auto_config(auto_config, params.get("config") or {})
- continue
-
- if action != "input":
- continue
-
- params = task.get("params") or {}
- process_input(str(params.get("command", "")).strip(), redis_client, ha_task_ids, runs)
-
-
-def services_list(services):
- return [
- {
- "name": name,
- "status": state["status"],
- "last_check": state["last_check"],
- "node": state.get("node"),
- "history": state["history"],
- }
- for name, state in sorted(services.items())
- ]
-
-
-def update_service_health(services, event):
- service_name = event.get("service")
- status = event.get("status")
- node = event.get("node")
- timestamp = event.get("timestamp", time.time())
- if not service_name or status not in ("ok", "error"):
- return
-
- state = services.setdefault(
- service_name,
- {
- "status": status,
- "last_check": timestamp,
- "node": node,
- "history": [],
- },
- )
- state["status"] = status
- state["last_check"] = timestamp
- state["node"] = node
- state["history"].append({"status": status, "timestamp": timestamp, "node": node})
- state["history"] = state["history"][-20:]
-
- emit_event(
- {
- "type": "services_state",
- "message": "services health updated",
- "run_id": None,
- "node": None,
- "services": services_list(services),
- }
- )
-
-
-def listen_for_external_events(redis_client, services):
- while True:
- try:
- _, raw_event = redis_client.brpop("events")
- event = json.loads(raw_event)
- except json.JSONDecodeError:
- emit_event(
- {
- "type": "log",
- "message": f"invalid external event: {raw_event}",
- "run_id": None,
- "node": None,
- }
- )
- continue
-
- emit_event(event)
- if event.get("type") == "health":
- update_service_health(services, event)
-
-
-def main():
- if not os.path.exists("/tmp/agent-events.log"):
- open("/tmp/agent-events.log", "w").close()
-
- redis_client = get_redis_client()
- ha_task_ids = {}
- runs = {}
- last_actions = {}
- run_actions = {}
- services = {}
- auto_config = {
- "auto_mode": True,
- "action_thresholds": {
- "restart_ha": 0.8,
- "check_network": 0.9,
- },
- "default_threshold": 0.9,
- "allowed_auto_actions": ["restart_ha"],
- "max_retries_per_action": {
- "restart_ha": 3,
- },
- "retry_window_seconds": 300,
- "action_history": {},
- }
-
- listener = threading.Thread(
- target=listen_for_results,
- args=(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config),
- daemon=True,
- )
- listener.start()
-
- input_listener = threading.Thread(
- target=listen_for_input_tasks,
- args=(redis_client, ha_task_ids, runs, run_actions, auto_config),
- daemon=True,
- )
- input_listener.start()
-
- event_listener = threading.Thread(
- target=listen_for_external_events,
- args=(redis_client, services),
- daemon=True,
- )
- event_listener.start()
-
- print("[orchestrator] ready")
- emit_event(
- {
- "type": "log",
- "message": "orchestrator ready",
- "run_id": None,
- "node": None,
- }
- )
- emit_auto_config(auto_config, "auto mode: on")
- while True:
- try:
- command = input("> ").strip()
- except EOFError:
- if not sys.stdin.isatty():
- print("[orchestrator] stdin closed, exiting")
- emit_event(
- {
- "type": "log",
- "message": "stdin closed, exiting",
- "run_id": None,
- "node": None,
- }
- )
- return
- print("[orchestrator] stdin closed, waiting...")
- emit_event(
- {
- "type": "log",
- "message": "stdin closed, waiting",
- "run_id": None,
- "node": None,
- }
- )
- time.sleep(2)
- continue
- except KeyboardInterrupt:
- print()
- continue
-
- process_input(command, redis_client, ha_task_ids, runs)
-
-
-if __name__ == "__main__":
- main()
diff --git a/orchestrator/redis_client.py b/orchestrator/redis_client.py
deleted file mode 100644
index 007760b..0000000
--- a/orchestrator/redis_client.py
+++ /dev/null
@@ -1,8 +0,0 @@
-import os
-
-import redis
-
-
-def get_redis_client():
- host = os.getenv("REDIS_HOST", "redis")
- return redis.Redis(host=host, port=6379, decode_responses=True)
diff --git a/orchestrator/requirements.txt b/orchestrator/requirements.txt
deleted file mode 100644
index 7800f0f..0000000
--- a/orchestrator/requirements.txt
+++ /dev/null
@@ -1 +0,0 @@
-redis
diff --git a/orchestrator/result_listener.py b/orchestrator/result_listener.py
deleted file mode 100644
index ed2188d..0000000
--- a/orchestrator/result_listener.py
+++ /dev/null
@@ -1,567 +0,0 @@
-import json
-import re
-import threading
-import time
-from uuid import uuid4
-
-from diagnosis import DiagnosisEngine
-from events import emit_event, emit_run_progress
-from task_builder import build_task
-
-
-def send_task(redis_client, target, action, params, runs=None, run_id=None):
- task = build_task(target, action, params)
- if run_id:
- task["run_id"] = run_id
- if runs is not None and run_id in runs:
- runs[run_id]["expected"] += 1
- emit_run_progress(run_id, runs[run_id])
-
- redis_client.lpush("tasks", json.dumps(task))
- print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
- emit_event(
- {
- "type": "task",
- "message": f"sent {task['action']} to {task['target']}",
- "run_id": task.get("run_id"),
- "node": task.get("target"),
- }
- )
- return task
-
-
-def detect_homeassistant(result):
- if isinstance(result, list):
- for item in result:
- name = item.get("name", "")
- if "homeassistant" in name:
- return name
-
- if isinstance(result, str):
- for token in result.split():
- if "homeassistant" in token:
- return token.lstrip("/")
-
- return None
-
-
-def extract_first_http_url(result):
- if not isinstance(result, str):
- return None
-
- if "ClientConnectorError" not in result or "http://" not in result:
- return None
-
- match = re.search(r"http://[^\s'\"),]+", result)
- if not match:
- return None
-
- return match.group(0)
-
-
-def parse_host_port(url):
- address = url.split("http://", 1)[1].split("/", 1)[0]
- if ":" not in address:
- return None, None
-
- host, port = address.rsplit(":", 1)
- return host, port
-
-
-def normalize_http_code(result):
- code = str(result).strip()
- if code.isdigit():
- return code
- return None
-
-
-def add_http_signal(engine, key, result):
- code = normalize_http_code(result)
- if code is not None:
- engine.add(key, code)
-
-
-def add_proxy_signal(engine, result):
- error = str(result.get("error") or "")
- if "exit code 6" in error:
- engine.add("ha_proxy", "dns_error")
- return
-
- add_http_signal(engine, "ha_proxy", result.get("result"))
-
-
-def action_in_cooldown(last_actions, node, action_type, run_id=None):
- key = f"{node}:{action_type}"
- now = time.time()
- if key in last_actions and now - last_actions[key] < 60:
- print("[action] skipped (cooldown)")
- emit_event(
- {
- "type": "action",
- "message": "[action] skipped (cooldown)",
- "run_id": run_id,
- "node": node,
- }
- )
- return True
-
- last_actions[key] = now
- return False
-
-
-def sorted_proposal_options(result):
- options = result.get("options", [])
- action_options = [
- option for option in options
- if not option.get("is_ignore")
- ]
- ignore_options = [
- option for option in options
- if option.get("is_ignore")
- ]
- return sorted(
- action_options,
- key=lambda option: option.get("confidence", 0),
- reverse=True,
- ) + ignore_options
-
-
-def emit_proposal_event(run_id, result):
- emit_event(
- {
- "type": "proposal",
- "run_id": run_id,
- "node": None,
- "message": result.get("message"),
- "confidence": result.get("confidence"),
- "options": sorted_proposal_options(result),
- }
- )
-
-
-def retry_limit_reached(command, run_id, auto_config):
- action_history = auto_config.setdefault("action_history", {})
- history = action_history.setdefault(command, [])
- now = time.time()
- retry_window = auto_config.get("retry_window_seconds", 300)
- cutoff = now - retry_window
- history[:] = [timestamp for timestamp in history if timestamp >= cutoff]
-
- retry_limit = auto_config.get("max_retries_per_action", {}).get(command)
- if retry_limit is None:
- return False
-
- if len(history) >= retry_limit:
- emit_event(
- {
- "type": "auto_action",
- "run_id": run_id,
- "node": None,
- "message": f"[auto] blocked: {command} retry limit reached",
- }
- )
- return True
-
- history.append(now)
- return False
-
-
-def emit_learning(message, run_id=None):
- emit_event(
- {
- "type": "learning",
- "message": message,
- "run_id": run_id,
- "node": None,
- }
- )
-
-
-def start_ha_check_run(redis_client, runs, ha_task_ids, learning_action=None):
- run_id = str(uuid4())
- runs[run_id] = {
- "expected": 0,
- "received": 0,
- "engine": DiagnosisEngine(),
- "active": True,
- "status": "running",
- "learning_action": learning_action,
- }
- emit_event(
- {
- "type": "run_status",
- "message": "run status: running",
- "run_id": run_id,
- "node": None,
- "status": "running",
- }
- )
- emit_run_progress(run_id, runs[run_id])
-
- task = send_task(redis_client, "piha", "docker_ps", {}, runs, run_id)
- ha_task_ids[task["task_id"]] = run_id
- return run_id
-
-
-def schedule_action_outcome_check(redis_client, runs, ha_task_ids, action):
- emit_learning(f"[learning] checking outcome for {action}")
-
- def delayed_check():
- time.sleep(30)
- start_ha_check_run(
- redis_client,
- runs,
- ha_task_ids,
- learning_action=action,
- )
-
- thread = threading.Thread(target=delayed_check, daemon=True)
- thread.start()
-
-
-def record_action_outcome(run, run_id, action_stats):
- action = run.get("learning_action")
- if not action:
- return
-
- stats = action_stats.setdefault(action, {"success": 0, "failure": 0})
- if run["engine"].signals.get("ha_logs_client_connector_error"):
- stats["failure"] += 1
- emit_learning(f"[learning] {action} failed", run_id)
- else:
- stats["success"] += 1
- emit_learning(f"[learning] {action} success", run_id)
-
- total = stats["success"] + stats["failure"]
- success_rate = stats["success"] / total if total else 0
- emit_learning(f"[learning] {action} success_rate: {success_rate:.0%}", run_id)
-
-
-def auto_execute_action(
- redis_client,
- engine,
- option,
- run_id,
- last_actions,
- auto_config,
- runs,
- ha_task_ids,
-):
- command = option.get("command")
- confidence = option.get("confidence")
-
- if command == "restart_ha":
- if action_in_cooldown(last_actions, "piha", "restart_ha", run_id):
- return False
- if retry_limit_reached(command, run_id, auto_config):
- return False
-
- container_name = engine.signals.get("ha_container_name") or "homeassistant"
- emit_event(
- {
- "type": "auto_action",
- "run_id": run_id,
- "node": "piha",
- "message": f"Auto-executed: {command}",
- "confidence": confidence,
- }
- )
- send_task(
- redis_client,
- "piha",
- "exec",
- {"cmd": f"docker restart {container_name}"},
- run_id=run_id,
- )
- schedule_action_outcome_check(redis_client, runs, ha_task_ids, command)
- return True
-
- return False
-
-
-def maybe_auto_execute_proposal(
- redis_client,
- run,
- run_id,
- result,
- last_actions,
- auto_config,
- runs,
- ha_task_ids,
-):
- if run.get("learning_action"):
- return
- if not auto_config.get("auto_mode"):
- return
-
- options = [
- option for option in sorted_proposal_options(result)
- if not option.get("is_ignore") and isinstance(option.get("confidence"), (int, float))
- ]
- if not options:
- return
-
- best = options[0]
- allowed_actions = set(auto_config.get("allowed_auto_actions", []))
- command = best.get("command")
- if command not in allowed_actions:
- return
-
- threshold = auto_config.get("action_thresholds", {}).get(
- command,
- auto_config.get("default_threshold", 0.9),
- )
- if best.get("confidence", 0) < threshold:
- return
-
- auto_execute_action(
- redis_client,
- run["engine"],
- best,
- run_id,
- last_actions,
- auto_config,
- runs,
- ha_task_ids,
- )
-
-
-def emit_evaluation_event(run_id, result):
- if isinstance(result, dict) and result.get("type") == "proposal":
- emit_proposal_event(run_id, result)
- return
-
- print(result)
- emit_event(
- {
- "type": "diagnosis",
- "message": result,
- "run_id": run_id,
- "node": None,
- }
- )
-
-
-def has_error(results):
- return any(
- isinstance(result, str) and result.startswith("[error]")
- for result in results
- )
-
-
-def store_action_context(run_actions, run_id, engine):
- run_actions[run_id] = {
- "ha_container_name": engine.signals.get("ha_container_name") or "homeassistant",
- }
-
-
-def evaluate_run_if_complete(
- redis_client,
- runs,
- run_id,
- last_actions,
- run_actions,
- auto_config,
- ha_task_ids,
- action_stats,
-):
- run = runs.get(run_id)
- if not run or run["received"] != run["expected"]:
- return
-
- results = run["engine"].evaluate()
- for result in results:
- emit_evaluation_event(run_id, result)
- if isinstance(result, dict) and result.get("type") == "proposal":
- maybe_auto_execute_proposal(
- redis_client,
- run,
- run_id,
- result,
- last_actions,
- auto_config,
- runs,
- ha_task_ids,
- )
- status = "error" if has_error(results) else "done"
- run["active"] = False
- run["status"] = status
- store_action_context(run_actions, run_id, run["engine"])
- emit_event(
- {
- "type": "run_status",
- "message": f"run status: {status}",
- "run_id": run_id,
- "node": None,
- "status": status,
- }
- )
- record_action_outcome(run, run_id, action_stats)
- runs.pop(run_id, None)
-
-
-def tracked_run_id_for_task(task_id, ha_task_ids, ha_log_task_ids, ha_check_tasks):
- if task_id in ha_task_ids:
- return ha_task_ids[task_id]
- if task_id in ha_log_task_ids:
- return ha_log_task_ids[task_id]
- if task_id in ha_check_tasks:
- return ha_check_tasks[task_id][0]
- return None
-
-
-def listen_for_results(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config):
- ha_log_task_ids = {}
- ha_check_tasks = {}
- ha_check_results = {}
- action_stats = {
- "restart_ha": {
- "success": 0,
- "failure": 0,
- }
- }
-
- while True:
- _, raw_result = redis_client.brpop("results")
- try:
- result = json.loads(raw_result)
- except json.JSONDecodeError:
- print(f"\n[result:error] invalid json: {raw_result}")
- emit_event(
- {
- "type": "result",
- "message": f"invalid json: {raw_result}",
- "run_id": None,
- "node": None,
- }
- )
- continue
-
- print("\n--- result ---")
- print(f"task_id: {result.get('task_id')}")
- print(f"node: {result.get('node')}")
- print(f"status: {result.get('status')}")
- print(f"result: {result.get('result')}")
- print(f"error: {result.get('error')}")
- print("--------------")
-
- task_id = result.get("task_id")
- run_id = result.get("run_id") or tracked_run_id_for_task(
- task_id,
- ha_task_ids,
- ha_log_task_ids,
- ha_check_tasks,
- )
- emit_event(
- {
- "type": "result",
- "message": f"result received: {task_id} status={result.get('status')}",
- "run_id": run_id,
- "node": result.get("node"),
- }
- )
- run = runs.get(run_id) if run_id else None
- if not run:
- continue
-
- run["received"] += 1
- emit_run_progress(run_id, run)
-
- if task_id in ha_check_tasks:
- tracked_run_id, label = ha_check_tasks.pop(task_id)
- if tracked_run_id != run_id:
- continue
- checks = ha_check_results.setdefault(run_id, {})
- checks[label] = normalize_http_code(result.get("result"))
- if label == "ha_local_check":
- add_http_signal(run["engine"], "ha_local", result.get("result"))
- add_http_signal(run["engine"], "lms", result.get("result"))
- elif label == "ha_proxy_check":
- add_proxy_signal(run["engine"], result)
-
- if "ha_local_check" in checks and "ha_proxy_check" in checks:
- ha_check_results.pop(run_id, None)
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- if task_id in ha_log_task_ids:
- tracked_run_id = ha_log_task_ids.pop(task_id)
- if tracked_run_id != run_id:
- continue
- if result.get("status") != "ok":
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- if "ClientConnectorError" in str(result.get("result")):
- run["engine"].add("ha_logs_client_connector_error", True)
-
- url = extract_first_http_url(result.get("result"))
- if not url:
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- host, port = parse_host_port(url)
- if not host or not port:
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- check_task = send_task(
- redis_client,
- "piha",
- "exec",
- {"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"},
- runs,
- run_id,
- )
- ha_check_tasks[check_task["task_id"]] = (run_id, "ha_local_check")
-
- proxy_task = send_task(
- redis_client,
- "vps",
- "exec",
- {"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"},
- runs,
- run_id,
- )
- ha_check_tasks[proxy_task["task_id"]] = (run_id, "ha_proxy_check")
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- if task_id in ha_task_ids:
- tracked_run_id = ha_task_ids.pop(task_id)
- if tracked_run_id != run_id:
- continue
- if result.get("status") != "ok":
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- container_name = detect_homeassistant(result.get("result"))
- if not container_name:
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- print(f"[orchestrator] detected HA container: {container_name}")
- emit_event(
- {
- "type": "log",
- "message": f"detected HA container: {container_name}",
- "run_id": run_id,
- "node": result.get("node"),
- }
- )
- run["engine"].add("ha_container", "running")
- run["engine"].add("ha_container_name", container_name)
-
- logs_task = send_task(
- redis_client,
- "piha",
- "docker_logs",
- {"container": container_name},
- runs,
- run_id,
- )
- ha_log_task_ids[logs_task["task_id"]] = run_id
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
- continue
-
- evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
diff --git a/orchestrator/task_builder.py b/orchestrator/task_builder.py
deleted file mode 100644
index 879e552..0000000
--- a/orchestrator/task_builder.py
+++ /dev/null
@@ -1,11 +0,0 @@
-from uuid import uuid4
-
-
-def build_task(target, action, params=None):
- return {
- "task_id": str(uuid4()),
- "target": target,
- "type": "infra",
- "action": action,
- "params": params or {},
- }
diff --git a/scripts/executor/executor.py b/scripts/executor/executor.py
deleted file mode 100644
index 12bb98f..0000000
--- a/scripts/executor/executor.py
+++ /dev/null
@@ -1,225 +0,0 @@
-#!/usr/bin/env python3
-import os
-import json
-import time
-import sys
-import shutil
-import uuid
-from pathlib import Path
-
-# Configuration
-ACTIONS_ROOT = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
-EVENT_LOG = Path("/tmp/agent-events.log")
-HISTORY_LOG = ACTIONS_ROOT / "history.log"
-
-def emit_event(event_type, message, details=None):
- """Emit action lifecycle events."""
- event = {
- "type": event_type,
- "message": message,
- "timestamp": time.time(),
- "details": details or {}
- }
- line = json.dumps(event)
- print(line)
- try:
- with open(EVENT_LOG, "a") as f:
- f.write(line + "\n")
- f.flush()
- except Exception as e:
- print(f"Error writing to event log: {e}", file=sys.stderr)
-
-def log_history(action_id, status, message):
- """Append-only execution history."""
- entry = {
- "timestamp": time.time(),
- "action_id": action_id,
- "status": status,
- "message": message
- }
- try:
- with open(HISTORY_LOG, "a") as f:
- f.write(json.dumps(entry) + "\n")
- f.flush()
- except Exception as e:
- print(f"Error writing history: {e}", file=sys.stderr)
-
-def ensure_dirs():
- for d in ["pending", "approved", "running", "completed", "failed", "rejected"]:
- (ACTIONS_ROOT / d).mkdir(parents=True, exist_ok=True)
-
-def approve_action(action_id):
- ensure_dirs()
- if not action_id.endswith(".json"):
- filename = f"{action_id}.json"
- else:
- filename = action_id
-
- pending_path = ACTIONS_ROOT / "pending" / filename
- if not pending_path.exists():
- print(f"Action {filename} not found in pending.")
- return False
-
- approved_path = ACTIONS_ROOT / "approved" / filename
-
- try:
- with open(pending_path, "r") as f:
- action = json.load(f)
-
- action["status"] = "approved"
- action["approved_at"] = time.time()
-
- with open(pending_path, "w") as f:
- json.dump(action, f, indent=2)
-
- shutil.move(pending_path, approved_path)
-
- emit_event("action_approved", f"Action approved: {action['action_id']}", {"action_id": action['action_id']})
- log_history(action['action_id'], "approved", "Manual approval received")
- print(f"Action {action['action_id']} approved.")
- return True
- except Exception as e:
- print(f"Error approving action: {e}")
- return False
-
-def reject_action(action_id):
- ensure_dirs()
- if not action_id.endswith(".json"):
- filename = f"{action_id}.json"
- else:
- filename = action_id
-
- pending_path = ACTIONS_ROOT / "pending" / filename
- if not pending_path.exists():
- print(f"Action {filename} not found in pending.")
- return False
-
- rejected_path = ACTIONS_ROOT / "rejected" / filename
-
- try:
- with open(pending_path, "r") as f:
- action = json.load(f)
-
- action["status"] = "rejected"
- action["rejected_at"] = time.time()
-
- with open(pending_path, "w") as f:
- json.dump(action, f, indent=2)
-
- shutil.move(pending_path, rejected_path)
-
- emit_event("action_rejected", f"Action rejected: {action['action_id']}", {"action_id": action['action_id']})
- log_history(action['action_id'], "rejected", "Manual rejection received")
- print(f"Action {action['action_id']} rejected.")
- return True
- except Exception as e:
- print(f"Error rejecting action: {e}")
- return False
-
-def process_action(action_path, dry_run=False):
- """Process a single approved action."""
- try:
- with open(action_path, "r") as f:
- action = json.load(f)
- except Exception as e:
- print(f"Error reading action {action_path}: {e}")
- return
-
- action_id = action["action_id"]
- action_type = action["action_type"]
-
- # Move to running (Resumable execution state)
- running_path = ACTIONS_ROOT / "running" / action_path.name
- shutil.move(action_path, running_path)
-
- action["status"] = "running"
- action["started_at"] = time.time()
- with open(running_path, "w") as f:
- json.dump(action, f, indent=2)
-
- emit_event("action_started", f"Started action {action_id} ({action_type})", {"action_id": action_id})
- log_history(action_id, "running", f"Execution started (dry_run={dry_run})")
-
- # Simulation logic (Recommendation-safe execution model)
- print(f"Executing {action_type} for {action.get('service') or action.get('node')}...")
-
- # Idempotent simulation: in a real world, we'd check if it's already done
- time.sleep(0.5)
-
- success = True
- if dry_run:
- print(f"[DRY-RUN] Would execute {action_type} logic here.")
- else:
- # Initial action types implementation (Simulation)
- if action_type == "redeploy_service":
- print(f"DEBUG: Triggering container restart/redeploy for {action.get('service')}")
- elif action_type == "rerun_healthcheck":
- print(f"DEBUG: Running healthcheck for {action.get('service')}")
- elif action_type == "rerun_deployment_stage":
- print(f"DEBUG: Retrying deployment stage for {action.get('service')}")
- elif action_type == "collect_diagnostics":
- print(f"DEBUG: Collecting logs and metrics for {action.get('service') or action.get('node')}")
- else:
- print(f"DEBUG: Executing unknown action type: {action_type}")
-
- # Finalize
- if success:
- final_status = "completed"
- target_dir = ACTIONS_ROOT / "completed"
- else:
- final_status = "failed"
- target_dir = ACTIONS_ROOT / "failed"
-
- final_path = target_dir / action_path.name
- action["status"] = final_status
- action["finished_at"] = time.time()
-
- with open(running_path, "w") as f:
- json.dump(action, f, indent=2)
-
- shutil.move(running_path, final_path)
-
- emit_event(f"action_{final_status}", f"Action {action_id} {final_status}", {"action_id": action_id})
- log_history(action_id, final_status, "Execution finished")
-
-def run_executor(dry_run=False):
- ensure_dirs()
- print(f"--- Executor Run: {time.ctime()} (dry_run={dry_run}) ---")
-
- # 1. Resume running actions
- running_actions = list((ACTIONS_ROOT / "running").glob("*.json"))
- for action_file in running_actions:
- print(f"Resuming action: {action_file.name}")
- process_action(action_file, dry_run=dry_run)
-
- # 2. Process approved actions
- approved_actions = list((ACTIONS_ROOT / "approved").glob("*.json"))
- if not approved_actions:
- print("No approved actions found.")
- else:
- for action_file in approved_actions:
- process_action(action_file, dry_run=dry_run)
-
- print("Run complete.")
-
-if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="Homelab Action Executor")
- parser.add_argument("command", choices=["run", "approve", "reject"], nargs="?", default="run")
- parser.add_argument("action_id", nargs="?")
- parser.add_argument("--dry-run", action="store_true")
-
- args = parser.parse_args()
-
- if args.command == "run":
- run_executor(dry_run=args.dry_run)
- elif args.command == "approve":
- if not args.action_id:
- print("Error: action_id required for approve")
- sys.exit(1)
- approve_action(args.action_id)
- elif args.command == "reject":
- if not args.action_id:
- print("Error: action_id required for reject")
- sys.exit(1)
- reject_action(args.action_id)
diff --git a/scripts/executor/test_actions.sh b/scripts/executor/test_actions.sh
deleted file mode 100644
index 6cf7271..0000000
--- a/scripts/executor/test_actions.sh
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/bin/bash
-# Validation script for Homelab Action Queue System
-
-set -e
-
-BASE_DIR=$(pwd)
-export HOMELAB_WORLD_ROOT="$BASE_DIR/tmp/homelab/world"
-export HOMELAB_ACTIONS_ROOT="$BASE_DIR/tmp/homelab/actions"
-EVENT_LOG="/tmp/agent-events.log"
-
-echo "=== Starting Action Queue Validation ==="
-
-# 1. Setup drift scenarios
-echo "Setting up drift scenarios..."
-bash scripts/supervisor/test_scenarios.sh
-
-# 2. Run supervisor to generate action proposals
-echo "Running supervisor..."
-python3 scripts/supervisor/supervisor.py
-
-# 3. Check for pending actions
-echo "Checking pending actions..."
-ls -l "$HOMELAB_ACTIONS_ROOT/pending/"
-
-# Get an action ID from pending
-ACTION_FILE=$(ls "$HOMELAB_ACTIONS_ROOT/pending/" | head -n 1)
-if [ -z "$ACTION_FILE" ]; then
- echo "Error: No pending actions found!"
- exit 1
-fi
-ACTION_ID="${ACTION_FILE%.json}"
-echo "Found action: $ACTION_ID"
-
-# 4. Approve the action
-echo "Approving action $ACTION_ID..."
-python3 scripts/executor/executor.py approve "$ACTION_ID"
-
-# 5. Run executor
-echo "Running executor..."
-python3 scripts/executor/executor.py run
-
-# 6. Verify completion
-if [ -f "$HOMELAB_ACTIONS_ROOT/completed/$ACTION_FILE" ]; then
- echo "SUCCESS: Action $ACTION_ID moved to completed."
-else
- echo "FAILURE: Action $ACTION_ID NOT found in completed."
- exit 1
-fi
-
-# 7. Test rejection
-echo "Testing rejection..."
-NEXT_ACTION_FILE=$(ls "$HOMELAB_ACTIONS_ROOT/pending/" | head -n 1)
-if [ -n "$NEXT_ACTION_FILE" ]; then
- NEXT_ACTION_ID="${NEXT_ACTION_FILE%.json}"
- echo "Rejecting action $NEXT_ACTION_ID..."
- python3 scripts/executor/executor.py reject "$NEXT_ACTION_ID"
-
- if [ -f "$HOMELAB_ACTIONS_ROOT/rejected/$NEXT_ACTION_FILE" ]; then
- echo "SUCCESS: Action $NEXT_ACTION_ID moved to rejected."
- else
- echo "FAILURE: Action $NEXT_ACTION_ID NOT found in rejected."
- exit 1
- fi
-fi
-
-# 8. Verify events
-echo "Verifying events in $EVENT_LOG..."
-grep "action_created" "$EVENT_LOG" | tail -n 1
-grep "action_approved" "$EVENT_LOG" | tail -n 1
-grep "action_started" "$EVENT_LOG" | tail -n 1
-grep "action_completed" "$EVENT_LOG" | tail -n 1
-grep "action_rejected" "$EVENT_LOG" | tail -n 1
-
-echo "=== Validation Complete ==="
diff --git a/scripts/supervisor/supervisor.py b/scripts/supervisor/supervisor.py
deleted file mode 100644
index ce5d162..0000000
--- a/scripts/supervisor/supervisor.py
+++ /dev/null
@@ -1,363 +0,0 @@
-#!/usr/bin/env python3
-import os
-import sys
-import yaml
-import json
-import time
-import glob
-import uuid
-from pathlib import Path
-
-# Configuration
-WORLD_STATE_PATH = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world"))
-ACTIONS_ROOT = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
-INVENTORY_PATH = Path("hosts")
-EVENT_LOG = Path("/tmp/agent-events.log")
-CHECKPOINT_FILE = Path("/tmp/supervisor-checkpoint.json")
-
-# Action Queue Layout
-ACTION_DIRS = ["pending", "approved", "running", "completed", "failed", "rejected"]
-
-# Reconcile event types
-RECONCILE_REQUIRED = "reconcile_required"
-RECONCILE_RECOMMENDED = "reconcile_recommended"
-RECONCILE_BLOCKED = "reconcile_blocked"
-
-# Runtime summary states
-STATE_NOMINAL = "nominal"
-STATE_DEGRADED = "degraded"
-STATE_UNSTABLE = "unstable"
-STATE_RECONCILING = "reconciling"
-
-def ensure_action_dirs():
- """Ensure action queue directories exist."""
- for d in ACTION_DIRS:
- (ACTIONS_ROOT / d).mkdir(parents=True, exist_ok=True)
-
-def emit_action_proposal(recommendation):
- """Convert recommendation to action proposal and save to pending/."""
- ensure_action_dirs()
-
- action_type_map = {
- "redeploy": "redeploy_service",
- "deploy": "redeploy_service",
- "diagnostics": "collect_diagnostics",
- "failover_review": "collect_diagnostics",
- "review": "collect_diagnostics",
- "delayed_deployment": "rerun_deployment_stage"
- }
-
- action_type = action_type_map.get(recommendation["action"], "collect_diagnostics")
-
- risk_level_map = {
- "redeploy_service": "guarded",
- "rerun_healthcheck": "safe",
- "rerun_deployment_stage": "guarded",
- "collect_diagnostics": "safe"
- }
- risk_level = risk_level_map.get(action_type, "dangerous")
-
- # Dangerous always requires approval
- # Guarded defaults to approval
- approval_required = risk_level in ["dangerous", "guarded"]
-
- action_id = str(uuid.uuid4())
- action = {
- "action_id": action_id,
- "created_at": time.time(),
- "proposed_by": "supervisor",
- "correlation_id": str(uuid.uuid4()), # In a real system, link to drift ID
- "node": recommendation["drift"].get("node"),
- "service": recommendation["drift"].get("service"),
- "action_type": action_type,
- "risk_level": risk_level,
- "confidence": 0.9, # Default confidence
- "approval_required": approval_required,
- "autonomous_eligible": False, # No autonomy yet
- "status": "pending",
- "payload": recommendation["drift"],
- "rollback_reference": None
- }
-
- file_path = ACTIONS_ROOT / "pending" / f"{action_id}.json"
- try:
- with open(file_path, "w") as f:
- json.dump(action, f, indent=2)
-
- emit_event("action_created", f"Action proposed: {action_type} for {action.get('service') or action.get('node')}", {
- "action_id": action_id,
- "action_type": action_type,
- "node": action.get("node"),
- "service": action.get("service")
- })
- except Exception as e:
- print(f"Error emitting action proposal: {e}", file=sys.stderr)
-
-def emit_event(event_type, message, details=None):
- """Emit reconciliation events using existing event system (append-only file)."""
- event = {
- "type": event_type,
- "message": message,
- "timestamp": time.time(),
- "details": details or {}
- }
- line = json.dumps(event)
- print(line)
- try:
- # Append-only semantics
- with open(EVENT_LOG, "a", encoding="utf-8") as f:
- f.write(line + "\n")
- f.flush()
- except Exception as e:
- print(f"Error writing to event log: {e}", file=sys.stderr)
-
-def load_desired_state():
- """Load desired state from hosts/*/services.yaml."""
- desired = {"services": [], "nodes": []}
- if not INVENTORY_PATH.exists():
- return desired
-
- # Inventory model: hosts/{node_name}/services.yaml
- for yaml_file in glob.glob(str(INVENTORY_PATH / "*" / "services.yaml")):
- try:
- with open(yaml_file, "r") as f:
- data = yaml.safe_load(f)
- if data and "services" in data:
- node_name = Path(yaml_file).parent.name
- for svc in data["services"]:
- if isinstance(svc, str):
- svc = {"name": svc}
- svc["node"] = node_name
- desired["services"].append(svc)
- if node_name not in desired["nodes"]:
- desired["nodes"].append(node_name)
- except Exception as e:
- print(f"Error loading {yaml_file}: {e}", file=sys.stderr)
- return desired
-
-def load_world_state():
- """Load current world state from /opt/homelab/world/ (filesystem-first)."""
- world = {
- "services": {},
- "nodes": {},
- "deployments": {},
- "incidents": {}
- }
-
- if not WORLD_STATE_PATH.exists():
- return world
-
- # Filesystem-first design: each category is a directory, each item is a JSON file
- for category in ["services", "nodes", "deployments", "incidents"]:
- cat_path = WORLD_STATE_PATH / category
- if cat_path.exists() and cat_path.is_dir():
- for json_file in cat_path.glob("*.json"):
- try:
- with open(json_file, "r") as f:
- world[category][json_file.stem] = json.load(f)
- except Exception as e:
- print(f"Error loading {json_file}: {e}", file=sys.stderr)
-
- return world
-
-def detect_drift(desired, world):
- """Compare desired infrastructure state with observed runtime state."""
- drifts = []
-
- # 1. Missing service & 2. Unhealthy service
- desired_service_names = set()
- for d_svc in desired["services"]:
- name = d_svc["name"]
- desired_service_names.add(name)
- a_svc = world["services"].get(name)
-
- if not a_svc:
- drifts.append({
- "type": "missing_service",
- "service": name,
- "node": d_svc["node"]
- })
- elif a_svc.get("status") not in ("ok", "healthy", "up"):
- drifts.append({
- "type": "unhealthy_service",
- "service": name,
- "status": a_svc.get("status"),
- "node": a_svc.get("node") or d_svc["node"]
- })
-
- # 4. Offline node
- for node_name in desired["nodes"]:
- a_node = world["nodes"].get(node_name)
- if not a_node or a_node.get("status") not in ("online", "ok", "up"):
- drifts.append({
- "type": "offline_node",
- "node": node_name,
- "status": a_node.get("status") if a_node else "missing"
- })
-
- # 3. Failed deployment
- # Check for recent failures in world/deployments
- for d_id, d_data in world["deployments"].items():
- if d_data.get("status") == "failed":
- drifts.append({
- "type": "failed_deployment",
- "deployment_id": d_id,
- "service": d_data.get("service")
- })
-
- # 5. Unresolved incidents
- for i_id, i_data in world["incidents"].items():
- if i_data.get("status") not in ("resolved", "closed"):
- drifts.append({
- "type": "unresolved_incident",
- "incident_id": i_id,
- "description": i_data.get("description"),
- "status": i_data.get("status")
- })
-
- return drifts
-
-def recommendation_engine(drifts, world):
- """Recommendation mode only: emit proposed actions without mutation."""
- recommendations = []
- for drift in drifts:
- if drift["type"] == "unhealthy_service":
- recommendations.append({
- "drift": drift,
- "action": "redeploy",
- "message": f"Service {drift['service']} is unhealthy. Recommend redeploy.",
- "type": RECONCILE_REQUIRED
- })
- elif drift["type"] == "failed_deployment":
- service = drift["service"]
- # Recommendation: repeated deployment failures -> recommend diagnostics
- failures = [d for d in world["deployments"].values() if d.get("service") == service and d.get("status") == "failed"]
- if len(failures) > 2:
- recommendations.append({
- "drift": drift,
- "action": "diagnostics",
- "message": f"Repeated deployment failures for {service}. Recommend diagnostics.",
- "type": RECONCILE_BLOCKED
- })
- else:
- recommendations.append({
- "drift": drift,
- "action": "redeploy",
- "message": f"Deployment failed for {service}. Recommend retry.",
- "type": RECONCILE_REQUIRED
- })
- elif drift["type"] == "offline_node":
- # Recommendation: node offline -> recommend failover review
- recommendations.append({
- "drift": drift,
- "action": "failover_review",
- "message": f"Node {drift['node']} is offline. Recommend failover review.",
- "type": RECONCILE_REQUIRED
- })
- elif drift["type"] == "missing_service":
- # Recommendation: dependency unavailable -> recommend delayed deployment
- # Mock dependency check: if a service is 'webapp' and 'database' is not healthy
- dependencies_met = True
- if drift["service"] == "webapp":
- db_svc = world["services"].get("database")
- if not db_svc or db_svc.get("status") not in ("ok", "healthy", "up"):
- dependencies_met = False
-
- if not dependencies_met:
- recommendations.append({
- "drift": drift,
- "action": "delayed_deployment",
- "message": f"Dependency unavailable for {drift['service']}. Recommend delayed deployment.",
- "type": RECONCILE_RECOMMENDED
- })
- else:
- recommendations.append({
- "drift": drift,
- "action": "deploy",
- "message": f"Service {drift['service']} is missing. Recommend deployment.",
- "type": RECONCILE_REQUIRED
- })
- elif drift["type"] == "unresolved_incident":
- recommendations.append({
- "drift": drift,
- "action": "review",
- "message": f"Unresolved incident: {drift['description']}. Recommend review.",
- "type": RECONCILE_RECOMMENDED
- })
- return recommendations
-
-def save_checkpoint(state):
- """Checkpoint support for idempotent operation."""
- try:
- with open(CHECKPOINT_FILE, "w") as f:
- json.dump({
- "last_run": time.time(),
- "state": state
- }, f)
- except Exception as e:
- print(f"Error saving checkpoint: {e}", file=sys.stderr)
-
-def load_checkpoint():
- """Load last run checkpoint."""
- if CHECKPOINT_FILE.exists():
- try:
- with open(CHECKPOINT_FILE, "r") as f:
- return json.load(f)
- except Exception as e:
- print(f"Error loading checkpoint: {e}", file=sys.stderr)
- return None
-
-def main():
- print(f"--- Supervisor Run: {time.ctime()} ---")
-
- checkpoint = load_checkpoint()
- if checkpoint:
- print(f"Last run: {time.ctime(checkpoint.get('last_run', 0))}")
-
- # 1. Load desired state
- desired = load_desired_state()
-
- # 2. Load world state
- world = load_world_state()
-
- # 3. Detect drift
- drifts = detect_drift(desired, world)
-
- # 4. Generate recommendations (Recommendation mode only)
- recommendations = recommendation_engine(drifts, world)
-
- # 5. Emit events & Update summary state
- if not recommendations and not drifts:
- emit_event("summary_state", f"System state: {STATE_NOMINAL}", {"state": STATE_NOMINAL})
- else:
- # Extend runtime summary states: nominal, degraded, unstable, reconciling
- has_blocked = any(r["type"] == RECONCILE_BLOCKED for r in recommendations)
- has_required = any(r["type"] == RECONCILE_REQUIRED for r in recommendations)
-
- overall_state = STATE_NOMINAL
- if has_blocked:
- overall_state = STATE_UNSTABLE
- elif has_required:
- overall_state = STATE_DEGRADED
- elif recommendations:
- overall_state = STATE_DEGRADED
-
- emit_event("summary_state", f"System state: {overall_state}", {"state": overall_state})
-
- # Emit reconciliation events
- for rec in recommendations:
- emit_event(rec["type"], rec["message"], rec["drift"])
- # Proposed: Emit action proposals to action queue
- emit_action_proposal(rec)
-
- # 6. Save checkpoint
- save_checkpoint({
- "drift_count": len(drifts),
- "recommendation_count": len(recommendations),
- "timestamp": time.time()
- })
-
- print("Run complete.")
-
-if __name__ == "__main__":
- main()
diff --git a/scripts/supervisor/test_scenarios.sh b/scripts/supervisor/test_scenarios.sh
deleted file mode 100644
index b03aed0..0000000
--- a/scripts/supervisor/test_scenarios.sh
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/bin/bash
-# Generate realistic reconciliation drift scenarios for the homelab supervisor.
-
-set -e
-
-# Configuration
-BASE_DIR=$(pwd)
-# Use local directory for testing if /opt is not writable
-WORLD_DIR="${HOMELAB_WORLD_ROOT:-$BASE_DIR/tmp/homelab/world}"
-INVENTORY_DIR="$BASE_DIR/hosts"
-
-echo "Setting up homelab reconciliation scenarios..."
-echo "World state: $WORLD_DIR"
-echo "Inventory: $INVENTORY_DIR"
-
-# Cleanup
-rm -rf "$WORLD_DIR"
-rm -rf "$INVENTORY_DIR"
-
-# Create directories
-mkdir -p "$INVENTORY_DIR/node1"
-mkdir -p "$INVENTORY_DIR/node2"
-mkdir -p "$WORLD_DIR/services" "$WORLD_DIR/nodes" "$WORLD_DIR/deployments" "$WORLD_DIR/incidents"
-
-# --- Scenario 1: Nominal ---
-cat <
+ RUNTIME STATE IS STALE
+
Dashboard
@@ -407,9 +410,12 @@
}
}
- function setOperatorMode(mode) {
+ async function setOperatorMode(mode) {
console.log('Operator mode set to:', mode);
- // In real system, this would call backend
+ const res = await postData('/mode', {mode});
+ if (res && res.status === 'ok') {
+ console.log('Mode updated successfully');
+ }
}
function formatTime(ts) {
@@ -435,6 +441,15 @@
statusEl.textContent = `System Status: ${summary.status.toUpperCase()}`;
statusEl.className = 'sidebar-footer ' + getStatusClass(summary.status);
+ // Handle stale state
+ const staleBanner = document.getElementById('stale-banner');
+ if (summary.stale) {
+ staleBanner.classList.remove('hidden');
+ staleBanner.textContent = `CRITICAL: Runtime state is STALE (Last update: ${formatTime(summary.last_update)})`;
+ } else {
+ staleBanner.classList.add('hidden');
+ }
+
if (currentView === 'dashboard') {
const dashSummary = document.getElementById('dashboard-summary');
dashSummary.innerHTML = `
@@ -463,12 +478,12 @@
pendingEl.innerHTML = actions.pending.map(a => `
-
- ${a.type.toUpperCase()}
+ ${(a.action_type || a.type || 'unknown').toUpperCase()}
${a.risk_level}
${a.description}
-Target
${a.target.node} ${a.target.service || ''}
- Confidence
${Math.round(a.confidence*100)}%
+ ${a.description || a.action_type || 'No description'}
+Target
${a.node || (a.target && a.target.node) || 'unknown'} ${(a.service || (a.target && a.target.service)) || ''}
+ Confidence
${Math.round((a.confidence || 0)*100)}%
@@ -476,16 +491,21 @@
`).join('') || 'No pending actions.';
- const history = [...actions.approved, ...actions.running, ...actions.completed, ...actions.failed];
- historyEl.innerHTML = history.sort((a,b) => b.timestamp - a.timestamp).map(a => `
+ const history = [...actions.approved, ...actions.running, ...actions.completed, ...actions.failed, ...actions.rejected];
+ historyEl.innerHTML = history.sort((a,b) => (b.timestamp || b.updated_at || 0) - (a.timestamp || a.updated_at || 0)).map(a => `
- ${a.type.toUpperCase()}
+ ${(a.action_type || a.type || 'unknown').toUpperCase()}
${a.status}
- ${a.description}
- ${formatTime(a.timestamp)} | Target: ${a.target.node}
+ ${a.description || a.action_type || 'No description'}
+ ${formatTime(a.timestamp || a.updated_at)} | Target: ${a.node || (a.target && a.target.node)}
${a.status === 'approved' ? `` : ''}
+ ${a.transition_history ? `
+
+ Trace: ${a.transition_history.map(h => `${h.from}->${h.to}`).join(' → ')}
+
+ ` : ''}