Compare commits

...

4 commits

50 changed files with 137 additions and 3208 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ __pycache__/
.idea/
.vscode/
tmp/

View file

@ -1,541 +0,0 @@
Command finished with exit code 0.
Command output:
diff --git a/docs/operator/reconcile-review.md b/docs/operator/reconcile-review.md
index 8b99c6f..e56c333 100644
--- a/docs/operator/reconcile-review.md
+++ b/docs/operator/reconcile-review.md
@@ -2,11 +2,13 @@
The system continuously monitors for drift between desired and actual state.
-1. If a service is in RECONCILING state, check the Services view.
-2. Review the Recommendations view for automated or guarded actions.
-3. For 'safe' actions with high confidence, the system may act autonomously if enabled.
-4. For 'guarded' or 'dangerous' actions, an operator must manually approve the action.
-5. Risk Levels:
- - **Safe**: Minimal impact, high success rate.
- - **Guarded**: Potential brief service interruption.
- - **Dangerous**: Significant impact, potential data loss, or hardware interaction required.
+1. **Drift Detection**: When drift is detected, the supervisor generates a recommendation and a corresponding pending action.
+2. **Review**: Navigate to the **Recommendations** view for a high-level summary, or the **Action Queue** for the specific execution plan.
+3. **Approval**: For 'guarded' or 'dangerous' actions, click **Approve** in the Action Queue.
+4. **Execution**: Once approved, the action can be triggered manually by clicking **Execute**, or it will be picked up by the autonomous executor if the system is in `AUTONOMOUS` mode.
+5. **Observation**: Monitor the **Deployments** and **Topology** views to watch the reconciliation in real-time.
+
+Risk Levels:
+- **Safe**: Minimal impact, high success rate.
+- **Guarded**: Potential brief service interruption.
+- **Dangerous**: Significant impact, potential data loss, or node-level disruption.
diff --git a/scripts/supervisor/supervisor.py b/scripts/supervisor/supervisor.py
index e58027b..ce5d162 100644
--- a/scripts/supervisor/supervisor.py
+++ b/scripts/supervisor/supervisor.py
@@ -5,14 +5,19 @@ import yaml
import json
import time
import glob
+import uuid
from pathlib import Path
# Configuration
WORLD_STATE_PATH = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world"))
+ACTIONS_ROOT = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
INVENTORY_PATH = Path("hosts")
EVENT_LOG = Path("/tmp/agent-events.log")
CHECKPOINT_FILE = Path("/tmp/supervisor-checkpoint.json")
+# Action Queue Layout
+ACTION_DIRS = ["pending", "approved", "running", "completed", "failed", "rejected"]
+
# Reconcile event types
RECONCILE_REQUIRED = "reconcile_required"
RECONCILE_RECOMMENDED = "reconcile_recommended"
@@ -24,6 +29,70 @@ STATE_DEGRADED = "degraded"
STATE_UNSTABLE = "unstable"
STATE_RECONCILING = "reconciling"
+def ensure_action_dirs():
+ """Ensure action queue directories exist."""
+ for d in ACTION_DIRS:
+ (ACTIONS_ROOT / d).mkdir(parents=True, exist_ok=True)
+
+def emit_action_proposal(recommendation):
+ """Convert recommendation to action proposal and save to pending/."""
+ ensure_action_dirs()
+
+ action_type_map = {
+ "redeploy": "redeploy_service",
+ "deploy": "redeploy_service",
+ "diagnostics": "collect_diagnostics",
+ "failover_review": "collect_diagnostics",
+ "review": "collect_diagnostics",
+ "delayed_deployment": "rerun_deployment_stage"
+ }
+
+ action_type = action_type_map.get(recommendation["action"], "collect_diagnostics")
+
+ risk_level_map = {
+ "redeploy_service": "guarded",
+ "rerun_healthcheck": "safe",
+ "rerun_deployment_stage": "guarded",
+ "collect_diagnostics": "safe"
+ }
+ risk_level = risk_level_map.get(action_type, "dangerous")
+
+ # Dangerous always requires approval
+ # Guarded defaults to approval
+ approval_required = risk_level in ["dangerous", "guarded"]
+
+ action_id = str(uuid.uuid4())
+ action = {
+ "action_id": action_id,
+ "created_at": time.time(),
+ "proposed_by": "supervisor",
+ "correlation_id": str(uuid.uuid4()), # In a real system, link to drift ID
+ "node": recommendation["drift"].get("node"),
+ "service": recommendation["drift"].get("service"),
+ "action_type": action_type,
+ "risk_level": risk_level,
+ "confidence": 0.9, # Default confidence
+ "approval_required": approval_required,
+ "autonomous_eligible": False, # No autonomy yet
+ "status": "pending",
+ "payload": recommendation["drift"],
+ "rollback_reference": None
+ }
+
+ file_path = ACTIONS_ROOT / "pending" / f"{action_id}.json"
+ try:
+ with open(file_path, "w") as f:
+ json.dump(action, f, indent=2)
+
+ emit_event("action_created", f"Action proposed: {action_type} for {action.get('service') or action.get('node')}", {
+ "action_id": action_id,
+ "action_type": action_type,
+ "node": action.get("node"),
+ "service": action.get("service")
+ })
+ except Exception as e:
+ print(f"Error emitting action proposal: {e}", file=sys.stderr)
+
def emit_event(event_type, message, details=None):
"""Emit reconciliation events using existing event system (append-only file)."""
event = {
@@ -278,6 +347,8 @@ def main():
# Emit reconciliation events
for rec in recommendations:
emit_event(rec["type"], rec["message"], rec["drift"])
+ # Proposed: Emit action proposals to action queue
+ emit_action_proposal(rec)
# 6. Save checkpoint
save_checkpoint({
diff --git a/tmp/homelab/world/deployments/dep-001.json b/tmp/homelab/world/deployments/dep-001.json
index 02db067..f70d7a8 100644
--- a/tmp/homelab/world/deployments/dep-001.json
+++ b/tmp/homelab/world/deployments/dep-001.json
@@ -1 +1 @@
-{"id": "dep-001", "service": "webapp", "status": "failed", "timestamp": 1778597957}
+{"id": "dep-001", "service": "webapp", "status": "failed", "timestamp": 1778600510}
diff --git a/tmp/homelab/world/deployments/dep-002.json b/tmp/homelab/world/deployments/dep-002.json
index e977aa0..1ee5a29 100644
--- a/tmp/homelab/world/deployments/dep-002.json
+++ b/tmp/homelab/world/deployments/dep-002.json
@@ -1 +1 @@
-{"id": "dep-002", "service": "webapp", "status": "failed", "timestamp": 1778597657}
+{"id": "dep-002", "service": "webapp", "status": "failed", "timestamp": 1778600210}
diff --git a/tmp/homelab/world/deployments/dep-003.json b/tmp/homelab/world/deployments/dep-003.json
index 66f10c9..f44385b 100644
--- a/tmp/homelab/world/deployments/dep-003.json
+++ b/tmp/homelab/world/deployments/dep-003.json
@@ -1 +1 @@
-{"id": "dep-003", "service": "webapp", "status": "failed", "timestamp": 1778597357}
+{"id": "dep-003", "service": "webapp", "status": "failed", "timestamp": 1778599910}
diff --git a/webui/index.html b/webui/index.html
index d720307..5c049c1 100644
--- a/webui/index.html
+++ b/webui/index.html
@@ -216,9 +216,9 @@
.label { color: var(--text-muted); font-size: 12px; margin-bottom: 4px; }
.value { font-weight: 500; margin-bottom: 12px; }
- .risk-safe { color: var(--safe); }
- .risk-guarded { color: var(--guarded); }
- .risk-dangerous { color: var(--dangerous); }
+ .risk-safe { background: rgba(62, 175, 124, 0.1); color: var(--safe); }
+ .risk-guarded { background: rgba(230, 126, 34, 0.1); color: var(--guarded); }
+ .risk-dangerous { background: rgba(192, 57, 43, 0.1); color: var(--dangerous); }
</style>
</head>
@@ -229,6 +229,9 @@
<li class="nav-item active" onclick="showView('dashboard', this)">
<span>Dashboard</span>
</li>
+ <li class="nav-item" onclick="showView('actions', this)">
+ <span>Action Queue</span>
+ </li>
<li class="nav-item" onclick="showView('nodes', this)">
<span>Nodes</span>
</li>
@@ -238,9 +241,15 @@
<li class="nav-item" onclick="showView('deployments', this)">
<span>Deployments</span>
</li>
+ <li class="nav-item" onclick="showView('topology', this)">
+ <span>Topology</span>
+ </li>
<li class="nav-item" onclick="showView('events', this)">
<span>Events</span>
</li>
+ <li class="nav-item" onclick="showView('correlation', this)">
+ <span>Correlation</span>
+ </li>
<li class="nav-item" onclick="showView('recommendations', this)">
<span>Recommendations</span>
</li>
@@ -255,7 +264,16 @@
<main class="main-content">
<header>
- <div class="view-title" id="current-view-title">Dashboard</div>
+ <div style="display:flex; align-items:center; gap:20px">
+ <div class="view-title" id="current-view-title">Dashboard</div>
+ <select id="operator-mode" onchange="setOperatorMode(this.value)" style="background:var(--sidebar-color); border:1px solid var(--border-color); color:var(--accent-color); font-weight:bold; font-size:12px; padding:4px 8px">
+ <option value="observe">OBSERVE</option>
+ <option value="recommend">RECOMMEND</option>
+ <option value="approval" selected>APPROVAL</option>
+ <option value="autonomous">AUTONOMOUS</option>
+ <option value="maintenance">MAINTENANCE</option>
+ </select>
+ </div>
<div class="header-actions">
<button onclick="refreshData()">Refresh</button>
</div>
@@ -269,6 +287,10 @@
<div class="card-title">System Overview</div>
<div id="dashboard-summary" style="margin-top:20px"></div>
</div>
+ <div class="card">
+ <div class="card-title">Pending Actions</div>
+ <div id="dashboard-actions-summary" style="margin-top:20px"></div>
+ </div>
<div class="card">
<div class="card-title">Active Incidents</div>
<div id="dashboard-incidents" style="margin-top:20px"></div>
@@ -276,6 +298,20 @@
</div>
</div>
+ <!-- Actions View -->
+ <div id="view-actions" class="view hidden">
+ <div style="display:grid; grid-template-columns: 1fr 1fr; gap:24px">
+ <div>
+ <h3>Pending Approval</h3>
+ <div id="actions-pending" class="timeline"></div>
+ </div>
+ <div>
+ <h3>Active / History</h3>
+ <div id="actions-history" class="timeline"></div>
+ </div>
+ </div>
+ </div>
+
<!-- Nodes View -->
<div id="view-nodes" class="view hidden">
<div class="grid" id="nodes-list"></div>
@@ -291,11 +327,24 @@
<div class="grid" id="deployments-list"></div>
</div>
+ <!-- Topology View -->
+ <div id="view-topology" class="view hidden">
+ <div class="card" style="min-height:500px">
+ <div class="card-title">Runtime Topology</div>
+ <div id="topology-map" style="margin-top:20px; display:flex; flex-wrap:wrap; gap:40px; justify-content:center"></div>
+ </div>
+ </div>
+
<!-- Events View -->
<div id="view-events" class="view hidden">
<div class="timeline" id="events-timeline"></div>
</div>
+ <!-- Correlation View -->
+ <div id="view-correlation" class="view hidden">
+ <div id="correlation-chains" class="grid"></div>
+ </div>
+
<!-- Recommendations View -->
<div id="view-recommendations" class="view hidden">
<div class="grid" id="recommendations-list"></div>
@@ -335,6 +384,34 @@
}
}
+ async function postData(endpoint, data) {
+ try {
+ const res = await fetch(endpoint, {
+ method: 'POST',
+ headers: {'Content-Type': 'application/json'},
+ body: JSON.stringify(data)
+ });
+ return await res.json();
+ } catch (e) {
+ console.error('Post error:', endpoint, e);
+ return null;
+ }
+ }
+
+ async function mutateAction(id, status) {
+ const res = await postData('/action/mutate', {id, status});
+ if (res && res.status === 'ok') {
+ refreshData();
+ } else {
+ alert('Mutation failed');
+ }
+ }
+
+ function setOperatorMode(mode) {
+ console.log('Operator mode set to:', mode);
+ // In real system, this would call backend
+ }
+
function formatTime(ts) {
if (!ts) return 'N/A';
return new Date(ts * 1000).toLocaleString();
@@ -368,6 +445,53 @@
}
}
+ if (currentView === 'dashboard' || currentView === 'actions') {
+ const actions = await fetchData('/actions');
+ if (actions) {
+ if (currentView === 'dashboard') {
+ const dashActions = document.getElementById('dashboard-actions-summary');
+ const pendingCount = actions.pending.length;
+ dashActions.innerHTML = `
+ <div class="label">Pending</div><div class="value" style="color:var(--guarded)">${pendingCount}</div>
+ <div class="label">Running</div><div class="value" style="color:var(--reconciling)">${actions.running.length}</div>
+ `;
+ }
+ if (currentView === 'actions') {
+ const pendingEl = document.getElementById('actions-pending');
+ const historyEl = document.getElementById('actions-history');
+
+ pendingEl.innerHTML = actions.pending.map(a => `
+ <div class="card" style="margin-bottom:12px">
+ <div class="card-header">
+ <div class="card-title">${a.type.toUpperCase()}</div>
+ <span class="badge risk-${a.risk_level}">${a.risk_level}</span>
+ </div>
+ <p>${a.description}</p>
+ <div class="label">Target</div><div class="value">${a.target.node} ${a.target.service || ''}</div>
+ <div class="label">Confidence</div><div class="value">${Math.round(a.confidence*100)}%</div>
+ <div class="controls">
+ <button class="btn-primary" onclick="mutateAction('${a.id}', 'approved')">Approve</button>
+ <button onclick="mutateAction('${a.id}', 'rejected')">Reject</button>
+ </div>
+ </div>
+ `).join('') || 'No pending actions.';
+
+ const history = [...actions.approved, ...actions.running, ...actions.completed, ...actions.failed];
+ historyEl.innerHTML = history.sort((a,b) => b.timestamp - a.timestamp).map(a => `
+ <div class="event">
+ <div class="event-header">
+ <span>${a.type.toUpperCase()}</span>
+ <span class="badge ${getStatusClass(a.status)}">${a.status}</span>
+ </div>
+ <div>${a.description}</div>
+ <small>${formatTime(a.timestamp)} | Target: ${a.target.node}</small>
+ ${a.status === 'approved' ? `<div class="controls"><button class="btn-primary" onclick="mutateAction('${a.id}', 'running')">Execute</button></div>` : ''}
+ </div>
+ `).join('') || 'No history.';
+ }
+ }
+ }
+
if (currentView === 'dashboard' || currentView === 'events') {
const incidents = await fetchData('/incidents');
if (currentView === 'dashboard') {
@@ -474,6 +598,64 @@
`).join('');
}
+ if (currentView === 'topology') {
+ const nodes = await fetchData('/nodes');
+ const services = await fetchData('/services');
+ const topMap = document.getElementById('topology-map');
+ if (nodes && services) {
+ topMap.innerHTML = nodes.map(node => {
+ const nodeServices = services.filter(s => s.node === node.hostname || s.node === node.id);
+ return `
+ <div class="card" style="width:250px; border: 1px solid ${node.health === 'nominal' ? 'var(--border-color)' : 'var(--error)'}">
+ <div class="card-header">
+ <div class="card-title">${node.hostname}</div>
+ <span class="badge ${getStatusClass(node.health)}">${node.health}</span>
+ </div>
+ <div class="label">Capabilities</div>
+ <div class="value" style="font-size:11px">${node.capabilities.join(', ')}</div>
+ <div class="label">Services</div>
+ <div style="font-size:12px; margin-bottom:10px">
+ ${nodeServices.length > 0 ? nodeServices.map(s => `
+ <div style="display:flex; justify-content:space-between; margin-bottom:4px; padding:4px; background:rgba(255,255,255,0.03)">
+ <span>${s.name}</span>
+ <span class="${getStatusClass(s.health)}" style="font-size:10px">${s.health}</span>
+ </div>
+ ${s.dependencies.length > 0 ? `<div style="font-size:9px; color:var(--text-muted); margin-left:8px; margin-bottom:4px">dep: ${s.dependencies.join(', ')}</div>` : ''}
+ `).join('') : '<div class="value">None</div>'}
+ </div>
+ </div>
+ `;
+ }).join('');
+ }
+ }
+
+ if (currentView === 'correlation') {
+ const incidents = await fetchData('/incidents');
+ const actions = await fetchData('/actions');
+ const list = document.getElementById('correlation-chains');
+ if (incidents && actions) {
+ const allActions = Object.values(actions).flat();
+ list.innerHTML = incidents.map(inc => {
+ const related = allActions.filter(a => a.correlation_chain && a.correlation_chain.includes(inc.id));
+ return `
+ <div class="card">
+ <div class="card-header">
+ <div class="card-title">Incident: ${inc.id || 'INC-001'}</div>
+ <span class="badge status-error">Active</span>
+ </div>
+ <p>${inc.message}</p>
+ <div class="label">Related Actions</div>
+ ${related.map(a => `
+ <div class="event" style="margin-top:5px">
+ <strong>${a.type}</strong> (${a.status})<br>
+ <small>${a.description}</small>
+ </div>
+ `).join('') || '<div class="value">No actions yet</div>'}
+ </div>
+ `;
+ }).join('');
+ }
+ }
if (currentView === 'settings') {
const config = await fetchData('/config');
const content = document.getElementById('settings-content');
@@ -482,6 +664,8 @@
<div class="value">${config.auto_mode ? 'Enabled' : 'Disabled'}</div>
<div class="label">Action Thresholds</div>
<div class="value mono">${JSON.stringify(config.action_thresholds, null, 2)}</div>
+ <div class="label">Telegram Integration</div>
+ <div class="value" style="color:var(--text-muted)">Ready for mobile approval flows. Hook: /api/v1/telegram/webhook</div>
<button onclick="alert('Settings update not implemented in this demo')">Edit Configuration</button>
`;
}
diff --git a/webui/web.py b/webui/web.py
index 053ac1a..4727274 100644
--- a/webui/web.py
+++ b/webui/web.py
@@ -8,6 +8,7 @@ from pathlib import Path
STATE_DIR = Path("/opt/homelab/state")
EVENTS_DIR = Path("/opt/homelab/events")
WORLD_DIR = Path("/opt/homelab/world")
+ACTIONS_DIR = Path("/opt/homelab/actions")
EVENT_LOG = Path("/tmp/agent-events.log")
STATIC_DIR = Path(__file__).parent
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
@@ -164,6 +165,55 @@ def current_events():
return sorted(events, key=lambda x: x.get("timestamp", 0), reverse=True)
+def current_actions():
+ actions = {}
+ statuses = ["pending", "approved", "running", "completed", "failed", "rejected"]
+ for status in statuses:
+ actions[status] = []
+ status_dir = ACTIONS_DIR / status
+ if status_dir.exists():
+ for f in status_dir.glob("*.json"):
+ data = read_json_file(f)
+ if data:
+ actions[status].append(data)
+ return actions
+
+
+def mutate_action(action_id, target_status):
+ statuses = ["pending", "approved", "running", "completed", "failed", "rejected"]
+ if target_status not in statuses:
+ return False, f"Invalid target status: {target_status}"
+
+ # Find where the action is
+ source_path = None
+ for status in statuses:
+ p = ACTIONS_DIR / status / f"{action_id}.json"
+ if p.exists():
+ source_path = p
+ break
+
+ if not source_path:
+ return False, f"Action {action_id} not found"
+
+ target_dir = ACTIONS_DIR / target_status
+ target_dir.mkdir(parents=True, exist_ok=True)
+ target_path = target_dir / f"{action_id}.json"
+
+ try:
+ data = json.loads(source_path.read_text())
+ data["status"] = target_status
+ data["last_mutation"] = os.path.getmtime(source_path) # or current time
+ import time
+ data["last_mutation"] = time.time()
+
+ target_path.write_text(json.dumps(data, indent=2))
+ if source_path != target_path:
+ source_path.unlink()
+ return True, "Success"
+ except Exception as e:
+ return False, str(e)
+
+
def send_json(status, payload, handler):
body = (json.dumps(payload) + "\n").encode("utf-8")
handler.send_response(status)
@@ -207,6 +257,10 @@ class Handler(BaseHTTPRequestHandler):
send_json(200, current_events(), self)
return
+ if self.path == "/actions":
+ send_json(200, current_actions(), self)
+ return
+
if self.path == "/logs":
print("LOGS endpoint called", flush=True)
body = ("\n".join(tail_lines(EVENT_LOG, 200)) + "\n").encode("utf-8")
@@ -236,6 +290,7 @@ class Handler(BaseHTTPRequestHandler):
"/auto-mode",
"/config",
"/events",
+ "/action/mutate",
):
self.send_error(404)
return
@@ -291,6 +346,19 @@ class Handler(BaseHTTPRequestHandler):
send_json(200, {"status": "sent"}, self)
return
+ if self.path == "/action/mutate":
+ action_id = payload.get("id")
+ target = payload.get("status")
+ if not action_id or not target:
+ self.send_error(400, "id and status are required")
+ return
+ success, msg = mutate_action(action_id, target)
+ if success:
+ send_json(200, {"status": "ok"}, self)
+ else:
+ self.send_error(500, msg)
+ return
+
if not command:
self.send_error(400, "command is required")
return

View file

@ -1,9 +0,0 @@
services:
monitor-agent:
build: ./monitor-agent
environment:
NODE_NAME: ${NODE_NAME:-}
ORCHESTRATOR_URL: ${ORCHESTRATOR_URL:?set ORCHESTRATOR_URL}
SERVICES_TO_CHECK: ${SERVICES_TO_CHECK:-homeassistant,lms,forgejo}
INTERVAL_SECONDS: ${INTERVAL_SECONDS:-30}
restart: unless-stopped

View file

@ -1,41 +1,9 @@
services:
redis:
image: redis:7
ports:
- "6379:6379"
restart: unless-stopped
orchestrator:
build: ./orchestrator
depends_on:
- redis
environment:
REDIS_HOST: redis
HA_PROXY_URL: https://ha.okit.pl
volumes:
- /tmp/agent-events.log:/tmp/agent-events.log
stdin_open: true
tty: true
webui:
build: ./webui
container_name: agent-system-webui
environment:
REDIS_HOST: redis
ports:
- "8080:8080"
volumes:
- /tmp/agent-events.log:/tmp/agent-events.log
depends_on:
- orchestrator
- redis
monitor-agent:
build: ./monitor-agent
environment:
NODE_NAME: ubuntu-4gb-hel1-1
ORCHESTRATOR_URL: http://webui:8080/events
SERVICES_TO_CHECK: homeassistant,lms,forgejo
depends_on:
- webui
- /opt/homelab:/opt/homelab
restart: unless-stopped

View file

@ -1,27 +1,27 @@
# Operator Approval Workflow
This document describes the process of reviewing and approving actions generated by the reconciliation supervisor.
This document describes the process of reviewing and approving actions generated by the reconciliation supervisor. The Control Plane is entirely filesystem-first, meaning all state is derived from and written to specific directories.
## Workflow Stages
### 1. Action Identification
When the supervisor identifies a delta between desired and actual state, it generates a pending action in `/opt/homelab/actions/pending/`.
When the supervisor (running in `homelab-codex-ws`) identifies a delta between desired and actual state, it generates a pending action JSON file in `/opt/homelab/actions/pending/`.
### 2. Risk Assessment
Actions are categorized by risk level:
- **Safe**: Low impact, high confidence. Can be auto-approved in autonomous mode.
- **Guarded**: Moderate impact. Requires explicit operator approval.
- **Dangerous**: High impact (e.g., node redeploy). Requires multi-step approval or senior operator override.
- **Dangerous**: High impact (e.g., node redeploy). Requires multi-step approval. These are highlighted in red in the UI.
### 3. Review Process
1. Navigate to the **Action Queue** view.
2. Review the **Confidence Score** and **Correlation Chain** to understand why the action was proposed.
3. Check the **Rollback Availability**.
2. Review the **Confidence Score** and **Correlation Chain** (if available) to understand why the action was proposed.
3. Check the **Trace** to see the lifecycle of the action.
### 4. Decision
- **Approve**: Moves action to `approved` state.
- **Reject**: Moves action to `rejected` state and suppresses similar recommendations for a cooldown period.
- **Execute**: Transitions an approved action to `running` status.
- **Approve**: Moves the action JSON file from `pending/` to `approved/`.
- **Reject**: Moves the action JSON file from `pending/` to `rejected/`.
- **Execute**: Moves an approved action from `approved/` to `running/`. The live executor in the runtime will then pick it up.
## Mobile Approvals
Approval requests can be acknowledged via the Telegram bot integration, allowing for remote operational control.
## Filesystem Semantics
The operator console performs "mutations" by moving files between subdirectories in `/opt/homelab/actions/`. This ensures a robust, local-first operational trail.

View file

@ -0,0 +1,25 @@
# Stale State Semantics
In a local-first, filesystem-backed control plane, visibility depends on the freshness of the runtime state files.
## Detection
The Operator Control Plane monitors the modification time (mtime) of the `runtime-summary.json` file in `/opt/homelab/state/`.
- **Live**: If the file was updated within the last 60 seconds.
- **Stale**: If the file is older than 60 seconds.
## UI Representation
When the state is detected as stale:
1. A **Critical Warning Banner** appears at the top of the console.
2. The exact time of the last successful update is displayed.
3. Health badges and metrics should be treated with caution as they represent the last known good state, not necessarily the current live state.
## Causes of Staleness
- **Runtime Observer Failure**: The process responsible for writing state to the filesystem has crashed.
- **Node Isolation**: The node where the observer is running is offline or disconnected.
- **Filesystem Latency**: Issues with the underlying storage layer (e.g., SD card degradation).
## Operator Response
1. Check the **Nodes** view to identify if a specific observer node is offline.
2. Investigate the `homelab-codex-ws` runtime logs.
3. Manually verify critical services if the control plane remains stale for an extended period.

View file

@ -1,6 +0,0 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
CMD ["python", "main.py"]

View file

@ -1,117 +0,0 @@
import json
import os
import socket
import time
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
NODE_NAME = os.getenv("NODE_NAME") or socket.gethostname()
ORCHESTRATOR_URL = os.getenv("ORCHESTRATOR_URL")
SERVICES_TO_CHECK = {
name.strip()
for name in os.getenv("SERVICES_TO_CHECK", "").split(",")
if name.strip()
}
INTERVAL_SECONDS = int(os.getenv("INTERVAL_SECONDS", "30"))
SERVICE_CATALOG = [
{"name": "homeassistant", "type": "http", "url": "http://homeassistant:8123"},
{"name": "lms", "type": "tcp", "host": "192.168.31.6", "port": 9000},
{"name": "forgejo", "type": "http", "url": "http://forgejo:3000"},
{"name": "nginx", "type": "http", "url": "http://nginx"},
{"name": "mosquitto", "type": "tcp", "host": "mosquitto", "port": 1883},
]
def services_to_check():
if not SERVICES_TO_CHECK:
return SERVICE_CATALOG
return [
service for service in SERVICE_CATALOG
if service["name"] in SERVICES_TO_CHECK
]
def check_http(url):
request = Request(url, headers={"User-Agent": "monitor-agent/1.0"})
try:
with urlopen(request, timeout=5) as response:
return "ok" if response.status == 200 else "error"
except (HTTPError, URLError, TimeoutError, OSError):
return "error"
def check_tcp(host, port):
try:
with socket.create_connection((host, int(port)), timeout=5):
return "ok"
except OSError:
return "error"
def check_service(service):
service_type = service.get("type")
if service_type == "http":
return check_http(service["url"])
if service_type == "tcp":
return check_tcp(service["host"], service["port"])
return "error"
def build_event(service, status):
return {
"type": "health",
"service": service["name"],
"status": status,
"timestamp": time.time(),
"run_id": None,
"node": NODE_NAME,
}
def send_event(event):
body = json.dumps(event).encode("utf-8")
request = Request(
ORCHESTRATOR_URL,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urlopen(request, timeout=5) as response:
if response.status >= 300:
raise RuntimeError(f"event endpoint returned {response.status}")
def main():
if not ORCHESTRATOR_URL:
raise SystemExit("ORCHESTRATOR_URL is required")
selected_services = services_to_check()
print(
(
f"[monitor-agent] ready node={NODE_NAME} "
f"url={ORCHESTRATOR_URL} "
f"services={[service['name'] for service in selected_services]}"
),
flush=True,
)
while True:
started = time.time()
for service in selected_services:
status = check_service(service)
event = build_event(service, status)
try:
send_event(event)
except Exception as exc:
print(f"[monitor-agent] send failed: {exc}", flush=True)
print(json.dumps(event), flush=True)
elapsed = time.time() - started
time.sleep(max(0, INTERVAL_SECONDS - elapsed))
if __name__ == "__main__":
main()
print("MONITOR_AGENT_VERSION=1")

View file

@ -1,14 +0,0 @@
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update \
&& apt-get install -y --no-install-recommends curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]

View file

@ -1,116 +0,0 @@
import json
import os
import subprocess
import time
import redis
NODE_NAME = os.getenv("NODE_NAME", "node")
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
def redis_client():
return redis.Redis(host=REDIS_HOST, port=6379, decode_responses=True)
def build_result(task_id, status, result="", error=None, run_id=None):
return {
"task_id": task_id,
"run_id": run_id,
"node": NODE_NAME,
"status": status,
"result": result,
"error": error,
}
def run_command(cmd):
completed = subprocess.run(
cmd,
shell=True,
text=True,
capture_output=True,
timeout=30,
)
output = completed.stdout.strip()
error = completed.stderr.strip()
if completed.returncode != 0:
return "error", output, error or f"exit code {completed.returncode}"
return "ok", output, None
def docker_ps(params):
container = params.get("container")
cmd = "docker ps --format '{{.Names}}|{{.Status}}'"
status, output, error = run_command(cmd)
if status != "ok":
return status, output, error
containers = []
for line in output.splitlines():
name, _, state = line.partition("|")
if container and container not in name:
continue
containers.append({"name": name, "status": state})
return "ok", containers, None
def docker_logs(params):
container = params.get("container")
if not container:
return "error", "", "container parameter is required"
return run_command(f"docker logs --tail 120 {container}")
def handle_task(task):
action = task.get("action")
params = task.get("params") or {}
if action == "exec":
return run_command(params.get("cmd", ""))
if action == "docker_ps":
return docker_ps(params)
if action == "docker_logs":
return docker_logs(params)
return "error", "", f"unknown action: {action}"
def main():
client = redis_client()
print(f"[node-agent] ready node={NODE_NAME} redis={REDIS_HOST}")
while True:
try:
_, raw_task = client.brpop("tasks")
task = json.loads(raw_task)
if task.get("target") != NODE_NAME:
client.lpush("tasks", raw_task)
time.sleep(1)
continue
status, result, error = handle_task(task)
client.lpush(
"results",
json.dumps(
build_result(
task.get("task_id"),
status,
result,
error,
task.get("run_id"),
)
),
)
except Exception as exc:
print(f"[node-agent] error: {exc}")
time.sleep(2)
if __name__ == "__main__":
main()

View file

@ -1 +0,0 @@
redis

View file

@ -1,10 +0,0 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]

View file

@ -1,80 +0,0 @@
class DiagnosisEngine:
def __init__(self):
self.signals = {}
def add(self, key, value):
self.signals[key] = value
def evaluate(self):
results = []
lms = self.signals.get("lms")
ha_logs_client_error = self.signals.get("ha_logs_client_connector_error")
ha_container = self.signals.get("ha_container")
ha_dependency_failure = (
lms == "000"
and ha_logs_client_error
and ha_container == "running"
)
# LMS
if lms == "000" and not ha_dependency_failure:
results.append("[info] LMS is not reachable")
elif lms == "200":
results.append("[info] LMS: OK")
# Dependency failures
if ha_dependency_failure:
results.append("[warning] HA dependency failure: LMS unreachable")
# HA local
ha_local = self.signals.get("ha_local")
if ha_local == "000":
options = [
{
"label": "Restart HA",
"command": "restart_ha",
"confidence": 0.85,
},
{
"label": "Check network",
"command": "check_network",
"confidence": 0.6,
}
]
ignore_option = {
"label": "Ignore",
"command": "ignore",
"is_ignore": True,
}
results.append("[error] HA is not working locally")
results.append(
{
"type": "proposal",
"message": "Home Assistant is not working",
"confidence": 0.85,
"options": sorted(
options,
key=lambda option: option["confidence"],
reverse=True,
) + [ignore_option],
}
)
elif ha_local == "200":
results.append("[info] HA local: OK")
# HA proxy
ha_proxy = self.signals.get("ha_proxy")
if ha_proxy == "dns_error":
results.append("[error] HA proxy: DNS failure")
results.append("[error] external access issue: DNS or routing failure")
elif ha_proxy == "000":
results.append("[error] HA proxy: not reachable")
elif ha_proxy == "200":
results.append("[info] HA proxy: OK")
diagnosis_count = sum(1 for result in results if isinstance(result, str))
if diagnosis_count > 1:
results.append("[info] multiple issues detected")
return results

View file

@ -1,35 +0,0 @@
import json
import time
EVENT_LOG = "/tmp/agent-events.log"
def emit_event(event):
event.setdefault("run_id", None)
event.setdefault("node", None)
event.setdefault("timestamp", time.time())
line = json.dumps(event)
print(line)
print("EVENT WRITTEN")
try:
with open(EVENT_LOG, "a", encoding="utf-8") as event_log:
event_log.write(line + "\n")
event_log.flush()
except OSError as exc:
print(f"[event:error] failed to write event log: {exc}")
def emit_run_progress(run_id, run):
emit_event(
{
"type": "run_progress",
"run_id": run_id,
"node": None,
"received": run.get("received", 0),
"expected": run.get("expected", 0),
"message": (
f"run progress: {run.get('received', 0)}/"
f"{run.get('expected', 0)}"
),
}
)

View file

@ -1,408 +0,0 @@
import json
import os
import sys
import threading
import time
from uuid import uuid4
from diagnosis import DiagnosisEngine
from events import emit_event, emit_run_progress
from redis_client import get_redis_client
from result_listener import listen_for_results
from task_builder import build_task
def send_task(redis_client, task, runs=None, run_id=None):
if run_id:
task["run_id"] = run_id
if runs is not None and run_id in runs:
runs[run_id]["expected"] += 1
emit_run_progress(run_id, runs[run_id])
redis_client.lpush("tasks", json.dumps(task))
print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
emit_event(
{
"type": "task",
"message": f"sent {task['action']} to {task['target']}",
"run_id": task.get("run_id"),
"node": task.get("target"),
}
)
def stop_run(runs, run_id):
run = runs.get(run_id)
if not run or not run.get("active"):
return False
run["active"] = False
run["status"] = "stopped"
emit_event(
{
"type": "run_status",
"message": "run status: stopped",
"run_id": run_id,
"node": None,
"status": "stopped",
}
)
runs.pop(run_id, None)
return True
def apply_run_action(redis_client, run_actions, run_id, command):
context = run_actions.get(run_id, {})
if command == "ignore":
emit_event(
{
"type": "action",
"message": "proposal ignored",
"run_id": run_id,
"node": None,
}
)
return True
if command == "check_network":
emit_event(
{
"type": "action",
"message": "confirmed action: check network connectivity",
"run_id": run_id,
"node": "vps",
}
)
send_task(
redis_client,
build_task("vps", "exec", {"cmd": "ping -c 1 1.1.1.1"}),
run_id=run_id,
)
return True
if command == "restart_ha":
container_name = context.get("ha_container_name") or "homeassistant"
emit_event(
{
"type": "action",
"message": f"confirmed action: restart {container_name}",
"run_id": run_id,
"node": "piha",
}
)
send_task(
redis_client,
build_task("piha", "exec", {"cmd": f"docker restart {container_name}"}),
run_id=run_id,
)
return True
emit_event(
{
"type": "action",
"message": f"unknown proposal command: {command}",
"run_id": run_id,
"node": None,
}
)
return False
def emit_auto_config(auto_config, message=None):
emit_event(
{
"type": "auto_config",
"message": message or "auto config updated",
"run_id": None,
"node": None,
"auto_mode": auto_config.get("auto_mode"),
"action_thresholds": auto_config.get("action_thresholds", {}),
"default_threshold": auto_config.get("default_threshold"),
"allowed_auto_actions": auto_config.get("allowed_auto_actions", []),
"max_retries_per_action": auto_config.get("max_retries_per_action", {}),
"retry_window_seconds": auto_config.get("retry_window_seconds"),
}
)
def set_auto_config(auto_config, config):
if "auto_mode" in config:
auto_config["auto_mode"] = bool(config["auto_mode"])
if isinstance(config.get("action_thresholds"), dict):
auto_config["action_thresholds"] = {
str(command): float(threshold)
for command, threshold in config["action_thresholds"].items()
}
if "default_threshold" in config:
auto_config["default_threshold"] = float(config["default_threshold"])
if isinstance(config.get("allowed_auto_actions"), list):
auto_config["allowed_auto_actions"] = [
str(command)
for command in config["allowed_auto_actions"]
]
emit_auto_config(auto_config)
def set_auto_mode(auto_config, enabled):
set_auto_config(auto_config, {"auto_mode": enabled})
def dispatch(redis_client, command, ha_task_ids, runs):
if command == "ha":
run_id = str(uuid4())
runs[run_id] = {
"expected": 0,
"received": 0,
"engine": DiagnosisEngine(),
"active": True,
"status": "running",
}
emit_event(
{
"type": "run_status",
"message": "run status: running",
"run_id": run_id,
"node": None,
"status": "running",
}
)
emit_run_progress(run_id, runs[run_id])
task = build_task("piha", "docker_ps", {})
ha_task_ids[task["task_id"]] = run_id
send_task(redis_client, task, runs, run_id)
elif command == "stop":
for run_id, run in list(runs.items()):
if not run.get("active"):
continue
stop_run(runs, run_id)
elif command == "all":
send_task(redis_client, build_task("piha", "docker_ps", {}))
send_task(redis_client, build_task("vps", "docker_ps", {}))
else:
send_task(redis_client, build_task("piha", "docker_ps", {}))
def process_input(command, redis_client, ha_task_ids, runs):
if not command:
return
print(f"[input] {command}")
emit_event(
{
"type": "log",
"message": f"user input: {command}",
"run_id": None,
"node": None,
}
)
dispatch(redis_client, command, ha_task_ids, runs)
def listen_for_input_tasks(redis_client, ha_task_ids, runs, run_actions, auto_config):
while True:
_, raw_task = redis_client.brpop("tasks")
try:
task = json.loads(raw_task)
except json.JSONDecodeError:
continue
if task.get("target") != "orchestrator":
redis_client.lpush("tasks", raw_task)
time.sleep(1)
continue
action = task.get("action")
if action == "stop_run":
params = task.get("params") or {}
stop_run(runs, str(params.get("run_id", "")).strip())
continue
if action == "run_action":
params = task.get("params") or {}
apply_run_action(
redis_client,
run_actions,
str(params.get("run_id", "")).strip(),
str(params.get("command", "")).strip(),
)
continue
if action == "set_auto_mode":
params = task.get("params") or {}
set_auto_mode(auto_config, bool(params.get("auto_mode")))
continue
if action == "set_auto_config":
params = task.get("params") or {}
set_auto_config(auto_config, params.get("config") or {})
continue
if action != "input":
continue
params = task.get("params") or {}
process_input(str(params.get("command", "")).strip(), redis_client, ha_task_ids, runs)
def services_list(services):
return [
{
"name": name,
"status": state["status"],
"last_check": state["last_check"],
"node": state.get("node"),
"history": state["history"],
}
for name, state in sorted(services.items())
]
def update_service_health(services, event):
service_name = event.get("service")
status = event.get("status")
node = event.get("node")
timestamp = event.get("timestamp", time.time())
if not service_name or status not in ("ok", "error"):
return
state = services.setdefault(
service_name,
{
"status": status,
"last_check": timestamp,
"node": node,
"history": [],
},
)
state["status"] = status
state["last_check"] = timestamp
state["node"] = node
state["history"].append({"status": status, "timestamp": timestamp, "node": node})
state["history"] = state["history"][-20:]
emit_event(
{
"type": "services_state",
"message": "services health updated",
"run_id": None,
"node": None,
"services": services_list(services),
}
)
def listen_for_external_events(redis_client, services):
while True:
try:
_, raw_event = redis_client.brpop("events")
event = json.loads(raw_event)
except json.JSONDecodeError:
emit_event(
{
"type": "log",
"message": f"invalid external event: {raw_event}",
"run_id": None,
"node": None,
}
)
continue
emit_event(event)
if event.get("type") == "health":
update_service_health(services, event)
def main():
if not os.path.exists("/tmp/agent-events.log"):
open("/tmp/agent-events.log", "w").close()
redis_client = get_redis_client()
ha_task_ids = {}
runs = {}
last_actions = {}
run_actions = {}
services = {}
auto_config = {
"auto_mode": True,
"action_thresholds": {
"restart_ha": 0.8,
"check_network": 0.9,
},
"default_threshold": 0.9,
"allowed_auto_actions": ["restart_ha"],
"max_retries_per_action": {
"restart_ha": 3,
},
"retry_window_seconds": 300,
"action_history": {},
}
listener = threading.Thread(
target=listen_for_results,
args=(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config),
daemon=True,
)
listener.start()
input_listener = threading.Thread(
target=listen_for_input_tasks,
args=(redis_client, ha_task_ids, runs, run_actions, auto_config),
daemon=True,
)
input_listener.start()
event_listener = threading.Thread(
target=listen_for_external_events,
args=(redis_client, services),
daemon=True,
)
event_listener.start()
print("[orchestrator] ready")
emit_event(
{
"type": "log",
"message": "orchestrator ready",
"run_id": None,
"node": None,
}
)
emit_auto_config(auto_config, "auto mode: on")
while True:
try:
command = input("> ").strip()
except EOFError:
if not sys.stdin.isatty():
print("[orchestrator] stdin closed, exiting")
emit_event(
{
"type": "log",
"message": "stdin closed, exiting",
"run_id": None,
"node": None,
}
)
return
print("[orchestrator] stdin closed, waiting...")
emit_event(
{
"type": "log",
"message": "stdin closed, waiting",
"run_id": None,
"node": None,
}
)
time.sleep(2)
continue
except KeyboardInterrupt:
print()
continue
process_input(command, redis_client, ha_task_ids, runs)
if __name__ == "__main__":
main()

View file

@ -1,8 +0,0 @@
import os
import redis
def get_redis_client():
host = os.getenv("REDIS_HOST", "redis")
return redis.Redis(host=host, port=6379, decode_responses=True)

View file

@ -1 +0,0 @@
redis

View file

@ -1,567 +0,0 @@
import json
import re
import threading
import time
from uuid import uuid4
from diagnosis import DiagnosisEngine
from events import emit_event, emit_run_progress
from task_builder import build_task
def send_task(redis_client, target, action, params, runs=None, run_id=None):
task = build_task(target, action, params)
if run_id:
task["run_id"] = run_id
if runs is not None and run_id in runs:
runs[run_id]["expected"] += 1
emit_run_progress(run_id, runs[run_id])
redis_client.lpush("tasks", json.dumps(task))
print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
emit_event(
{
"type": "task",
"message": f"sent {task['action']} to {task['target']}",
"run_id": task.get("run_id"),
"node": task.get("target"),
}
)
return task
def detect_homeassistant(result):
if isinstance(result, list):
for item in result:
name = item.get("name", "")
if "homeassistant" in name:
return name
if isinstance(result, str):
for token in result.split():
if "homeassistant" in token:
return token.lstrip("/")
return None
def extract_first_http_url(result):
if not isinstance(result, str):
return None
if "ClientConnectorError" not in result or "http://" not in result:
return None
match = re.search(r"http://[^\s'\"),]+", result)
if not match:
return None
return match.group(0)
def parse_host_port(url):
address = url.split("http://", 1)[1].split("/", 1)[0]
if ":" not in address:
return None, None
host, port = address.rsplit(":", 1)
return host, port
def normalize_http_code(result):
code = str(result).strip()
if code.isdigit():
return code
return None
def add_http_signal(engine, key, result):
code = normalize_http_code(result)
if code is not None:
engine.add(key, code)
def add_proxy_signal(engine, result):
error = str(result.get("error") or "")
if "exit code 6" in error:
engine.add("ha_proxy", "dns_error")
return
add_http_signal(engine, "ha_proxy", result.get("result"))
def action_in_cooldown(last_actions, node, action_type, run_id=None):
key = f"{node}:{action_type}"
now = time.time()
if key in last_actions and now - last_actions[key] < 60:
print("[action] skipped (cooldown)")
emit_event(
{
"type": "action",
"message": "[action] skipped (cooldown)",
"run_id": run_id,
"node": node,
}
)
return True
last_actions[key] = now
return False
def sorted_proposal_options(result):
options = result.get("options", [])
action_options = [
option for option in options
if not option.get("is_ignore")
]
ignore_options = [
option for option in options
if option.get("is_ignore")
]
return sorted(
action_options,
key=lambda option: option.get("confidence", 0),
reverse=True,
) + ignore_options
def emit_proposal_event(run_id, result):
emit_event(
{
"type": "proposal",
"run_id": run_id,
"node": None,
"message": result.get("message"),
"confidence": result.get("confidence"),
"options": sorted_proposal_options(result),
}
)
def retry_limit_reached(command, run_id, auto_config):
action_history = auto_config.setdefault("action_history", {})
history = action_history.setdefault(command, [])
now = time.time()
retry_window = auto_config.get("retry_window_seconds", 300)
cutoff = now - retry_window
history[:] = [timestamp for timestamp in history if timestamp >= cutoff]
retry_limit = auto_config.get("max_retries_per_action", {}).get(command)
if retry_limit is None:
return False
if len(history) >= retry_limit:
emit_event(
{
"type": "auto_action",
"run_id": run_id,
"node": None,
"message": f"[auto] blocked: {command} retry limit reached",
}
)
return True
history.append(now)
return False
def emit_learning(message, run_id=None):
emit_event(
{
"type": "learning",
"message": message,
"run_id": run_id,
"node": None,
}
)
def start_ha_check_run(redis_client, runs, ha_task_ids, learning_action=None):
run_id = str(uuid4())
runs[run_id] = {
"expected": 0,
"received": 0,
"engine": DiagnosisEngine(),
"active": True,
"status": "running",
"learning_action": learning_action,
}
emit_event(
{
"type": "run_status",
"message": "run status: running",
"run_id": run_id,
"node": None,
"status": "running",
}
)
emit_run_progress(run_id, runs[run_id])
task = send_task(redis_client, "piha", "docker_ps", {}, runs, run_id)
ha_task_ids[task["task_id"]] = run_id
return run_id
def schedule_action_outcome_check(redis_client, runs, ha_task_ids, action):
emit_learning(f"[learning] checking outcome for {action}")
def delayed_check():
time.sleep(30)
start_ha_check_run(
redis_client,
runs,
ha_task_ids,
learning_action=action,
)
thread = threading.Thread(target=delayed_check, daemon=True)
thread.start()
def record_action_outcome(run, run_id, action_stats):
action = run.get("learning_action")
if not action:
return
stats = action_stats.setdefault(action, {"success": 0, "failure": 0})
if run["engine"].signals.get("ha_logs_client_connector_error"):
stats["failure"] += 1
emit_learning(f"[learning] {action} failed", run_id)
else:
stats["success"] += 1
emit_learning(f"[learning] {action} success", run_id)
total = stats["success"] + stats["failure"]
success_rate = stats["success"] / total if total else 0
emit_learning(f"[learning] {action} success_rate: {success_rate:.0%}", run_id)
def auto_execute_action(
redis_client,
engine,
option,
run_id,
last_actions,
auto_config,
runs,
ha_task_ids,
):
command = option.get("command")
confidence = option.get("confidence")
if command == "restart_ha":
if action_in_cooldown(last_actions, "piha", "restart_ha", run_id):
return False
if retry_limit_reached(command, run_id, auto_config):
return False
container_name = engine.signals.get("ha_container_name") or "homeassistant"
emit_event(
{
"type": "auto_action",
"run_id": run_id,
"node": "piha",
"message": f"Auto-executed: {command}",
"confidence": confidence,
}
)
send_task(
redis_client,
"piha",
"exec",
{"cmd": f"docker restart {container_name}"},
run_id=run_id,
)
schedule_action_outcome_check(redis_client, runs, ha_task_ids, command)
return True
return False
def maybe_auto_execute_proposal(
redis_client,
run,
run_id,
result,
last_actions,
auto_config,
runs,
ha_task_ids,
):
if run.get("learning_action"):
return
if not auto_config.get("auto_mode"):
return
options = [
option for option in sorted_proposal_options(result)
if not option.get("is_ignore") and isinstance(option.get("confidence"), (int, float))
]
if not options:
return
best = options[0]
allowed_actions = set(auto_config.get("allowed_auto_actions", []))
command = best.get("command")
if command not in allowed_actions:
return
threshold = auto_config.get("action_thresholds", {}).get(
command,
auto_config.get("default_threshold", 0.9),
)
if best.get("confidence", 0) < threshold:
return
auto_execute_action(
redis_client,
run["engine"],
best,
run_id,
last_actions,
auto_config,
runs,
ha_task_ids,
)
def emit_evaluation_event(run_id, result):
if isinstance(result, dict) and result.get("type") == "proposal":
emit_proposal_event(run_id, result)
return
print(result)
emit_event(
{
"type": "diagnosis",
"message": result,
"run_id": run_id,
"node": None,
}
)
def has_error(results):
return any(
isinstance(result, str) and result.startswith("[error]")
for result in results
)
def store_action_context(run_actions, run_id, engine):
run_actions[run_id] = {
"ha_container_name": engine.signals.get("ha_container_name") or "homeassistant",
}
def evaluate_run_if_complete(
redis_client,
runs,
run_id,
last_actions,
run_actions,
auto_config,
ha_task_ids,
action_stats,
):
run = runs.get(run_id)
if not run or run["received"] != run["expected"]:
return
results = run["engine"].evaluate()
for result in results:
emit_evaluation_event(run_id, result)
if isinstance(result, dict) and result.get("type") == "proposal":
maybe_auto_execute_proposal(
redis_client,
run,
run_id,
result,
last_actions,
auto_config,
runs,
ha_task_ids,
)
status = "error" if has_error(results) else "done"
run["active"] = False
run["status"] = status
store_action_context(run_actions, run_id, run["engine"])
emit_event(
{
"type": "run_status",
"message": f"run status: {status}",
"run_id": run_id,
"node": None,
"status": status,
}
)
record_action_outcome(run, run_id, action_stats)
runs.pop(run_id, None)
def tracked_run_id_for_task(task_id, ha_task_ids, ha_log_task_ids, ha_check_tasks):
if task_id in ha_task_ids:
return ha_task_ids[task_id]
if task_id in ha_log_task_ids:
return ha_log_task_ids[task_id]
if task_id in ha_check_tasks:
return ha_check_tasks[task_id][0]
return None
def listen_for_results(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config):
ha_log_task_ids = {}
ha_check_tasks = {}
ha_check_results = {}
action_stats = {
"restart_ha": {
"success": 0,
"failure": 0,
}
}
while True:
_, raw_result = redis_client.brpop("results")
try:
result = json.loads(raw_result)
except json.JSONDecodeError:
print(f"\n[result:error] invalid json: {raw_result}")
emit_event(
{
"type": "result",
"message": f"invalid json: {raw_result}",
"run_id": None,
"node": None,
}
)
continue
print("\n--- result ---")
print(f"task_id: {result.get('task_id')}")
print(f"node: {result.get('node')}")
print(f"status: {result.get('status')}")
print(f"result: {result.get('result')}")
print(f"error: {result.get('error')}")
print("--------------")
task_id = result.get("task_id")
run_id = result.get("run_id") or tracked_run_id_for_task(
task_id,
ha_task_ids,
ha_log_task_ids,
ha_check_tasks,
)
emit_event(
{
"type": "result",
"message": f"result received: {task_id} status={result.get('status')}",
"run_id": run_id,
"node": result.get("node"),
}
)
run = runs.get(run_id) if run_id else None
if not run:
continue
run["received"] += 1
emit_run_progress(run_id, run)
if task_id in ha_check_tasks:
tracked_run_id, label = ha_check_tasks.pop(task_id)
if tracked_run_id != run_id:
continue
checks = ha_check_results.setdefault(run_id, {})
checks[label] = normalize_http_code(result.get("result"))
if label == "ha_local_check":
add_http_signal(run["engine"], "ha_local", result.get("result"))
add_http_signal(run["engine"], "lms", result.get("result"))
elif label == "ha_proxy_check":
add_proxy_signal(run["engine"], result)
if "ha_local_check" in checks and "ha_proxy_check" in checks:
ha_check_results.pop(run_id, None)
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
if task_id in ha_log_task_ids:
tracked_run_id = ha_log_task_ids.pop(task_id)
if tracked_run_id != run_id:
continue
if result.get("status") != "ok":
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
if "ClientConnectorError" in str(result.get("result")):
run["engine"].add("ha_logs_client_connector_error", True)
url = extract_first_http_url(result.get("result"))
if not url:
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
host, port = parse_host_port(url)
if not host or not port:
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
check_task = send_task(
redis_client,
"piha",
"exec",
{"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"},
runs,
run_id,
)
ha_check_tasks[check_task["task_id"]] = (run_id, "ha_local_check")
proxy_task = send_task(
redis_client,
"vps",
"exec",
{"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"},
runs,
run_id,
)
ha_check_tasks[proxy_task["task_id"]] = (run_id, "ha_proxy_check")
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
if task_id in ha_task_ids:
tracked_run_id = ha_task_ids.pop(task_id)
if tracked_run_id != run_id:
continue
if result.get("status") != "ok":
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
container_name = detect_homeassistant(result.get("result"))
if not container_name:
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
print(f"[orchestrator] detected HA container: {container_name}")
emit_event(
{
"type": "log",
"message": f"detected HA container: {container_name}",
"run_id": run_id,
"node": result.get("node"),
}
)
run["engine"].add("ha_container", "running")
run["engine"].add("ha_container_name", container_name)
logs_task = send_task(
redis_client,
"piha",
"docker_logs",
{"container": container_name},
runs,
run_id,
)
ha_log_task_ids[logs_task["task_id"]] = run_id
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)
continue
evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats)

View file

@ -1,11 +0,0 @@
from uuid import uuid4
def build_task(target, action, params=None):
return {
"task_id": str(uuid4()),
"target": target,
"type": "infra",
"action": action,
"params": params or {},
}

View file

@ -1,225 +0,0 @@
#!/usr/bin/env python3
import os
import json
import time
import sys
import shutil
import uuid
from pathlib import Path
# Configuration
ACTIONS_ROOT = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
EVENT_LOG = Path("/tmp/agent-events.log")
HISTORY_LOG = ACTIONS_ROOT / "history.log"
def emit_event(event_type, message, details=None):
"""Emit action lifecycle events."""
event = {
"type": event_type,
"message": message,
"timestamp": time.time(),
"details": details or {}
}
line = json.dumps(event)
print(line)
try:
with open(EVENT_LOG, "a") as f:
f.write(line + "\n")
f.flush()
except Exception as e:
print(f"Error writing to event log: {e}", file=sys.stderr)
def log_history(action_id, status, message):
"""Append-only execution history."""
entry = {
"timestamp": time.time(),
"action_id": action_id,
"status": status,
"message": message
}
try:
with open(HISTORY_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
f.flush()
except Exception as e:
print(f"Error writing history: {e}", file=sys.stderr)
def ensure_dirs():
for d in ["pending", "approved", "running", "completed", "failed", "rejected"]:
(ACTIONS_ROOT / d).mkdir(parents=True, exist_ok=True)
def approve_action(action_id):
ensure_dirs()
if not action_id.endswith(".json"):
filename = f"{action_id}.json"
else:
filename = action_id
pending_path = ACTIONS_ROOT / "pending" / filename
if not pending_path.exists():
print(f"Action {filename} not found in pending.")
return False
approved_path = ACTIONS_ROOT / "approved" / filename
try:
with open(pending_path, "r") as f:
action = json.load(f)
action["status"] = "approved"
action["approved_at"] = time.time()
with open(pending_path, "w") as f:
json.dump(action, f, indent=2)
shutil.move(pending_path, approved_path)
emit_event("action_approved", f"Action approved: {action['action_id']}", {"action_id": action['action_id']})
log_history(action['action_id'], "approved", "Manual approval received")
print(f"Action {action['action_id']} approved.")
return True
except Exception as e:
print(f"Error approving action: {e}")
return False
def reject_action(action_id):
ensure_dirs()
if not action_id.endswith(".json"):
filename = f"{action_id}.json"
else:
filename = action_id
pending_path = ACTIONS_ROOT / "pending" / filename
if not pending_path.exists():
print(f"Action {filename} not found in pending.")
return False
rejected_path = ACTIONS_ROOT / "rejected" / filename
try:
with open(pending_path, "r") as f:
action = json.load(f)
action["status"] = "rejected"
action["rejected_at"] = time.time()
with open(pending_path, "w") as f:
json.dump(action, f, indent=2)
shutil.move(pending_path, rejected_path)
emit_event("action_rejected", f"Action rejected: {action['action_id']}", {"action_id": action['action_id']})
log_history(action['action_id'], "rejected", "Manual rejection received")
print(f"Action {action['action_id']} rejected.")
return True
except Exception as e:
print(f"Error rejecting action: {e}")
return False
def process_action(action_path, dry_run=False):
"""Process a single approved action."""
try:
with open(action_path, "r") as f:
action = json.load(f)
except Exception as e:
print(f"Error reading action {action_path}: {e}")
return
action_id = action["action_id"]
action_type = action["action_type"]
# Move to running (Resumable execution state)
running_path = ACTIONS_ROOT / "running" / action_path.name
shutil.move(action_path, running_path)
action["status"] = "running"
action["started_at"] = time.time()
with open(running_path, "w") as f:
json.dump(action, f, indent=2)
emit_event("action_started", f"Started action {action_id} ({action_type})", {"action_id": action_id})
log_history(action_id, "running", f"Execution started (dry_run={dry_run})")
# Simulation logic (Recommendation-safe execution model)
print(f"Executing {action_type} for {action.get('service') or action.get('node')}...")
# Idempotent simulation: in a real world, we'd check if it's already done
time.sleep(0.5)
success = True
if dry_run:
print(f"[DRY-RUN] Would execute {action_type} logic here.")
else:
# Initial action types implementation (Simulation)
if action_type == "redeploy_service":
print(f"DEBUG: Triggering container restart/redeploy for {action.get('service')}")
elif action_type == "rerun_healthcheck":
print(f"DEBUG: Running healthcheck for {action.get('service')}")
elif action_type == "rerun_deployment_stage":
print(f"DEBUG: Retrying deployment stage for {action.get('service')}")
elif action_type == "collect_diagnostics":
print(f"DEBUG: Collecting logs and metrics for {action.get('service') or action.get('node')}")
else:
print(f"DEBUG: Executing unknown action type: {action_type}")
# Finalize
if success:
final_status = "completed"
target_dir = ACTIONS_ROOT / "completed"
else:
final_status = "failed"
target_dir = ACTIONS_ROOT / "failed"
final_path = target_dir / action_path.name
action["status"] = final_status
action["finished_at"] = time.time()
with open(running_path, "w") as f:
json.dump(action, f, indent=2)
shutil.move(running_path, final_path)
emit_event(f"action_{final_status}", f"Action {action_id} {final_status}", {"action_id": action_id})
log_history(action_id, final_status, "Execution finished")
def run_executor(dry_run=False):
ensure_dirs()
print(f"--- Executor Run: {time.ctime()} (dry_run={dry_run}) ---")
# 1. Resume running actions
running_actions = list((ACTIONS_ROOT / "running").glob("*.json"))
for action_file in running_actions:
print(f"Resuming action: {action_file.name}")
process_action(action_file, dry_run=dry_run)
# 2. Process approved actions
approved_actions = list((ACTIONS_ROOT / "approved").glob("*.json"))
if not approved_actions:
print("No approved actions found.")
else:
for action_file in approved_actions:
process_action(action_file, dry_run=dry_run)
print("Run complete.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Homelab Action Executor")
parser.add_argument("command", choices=["run", "approve", "reject"], nargs="?", default="run")
parser.add_argument("action_id", nargs="?")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if args.command == "run":
run_executor(dry_run=args.dry_run)
elif args.command == "approve":
if not args.action_id:
print("Error: action_id required for approve")
sys.exit(1)
approve_action(args.action_id)
elif args.command == "reject":
if not args.action_id:
print("Error: action_id required for reject")
sys.exit(1)
reject_action(args.action_id)

View file

@ -1,74 +0,0 @@
#!/bin/bash
# Validation script for Homelab Action Queue System
set -e
BASE_DIR=$(pwd)
export HOMELAB_WORLD_ROOT="$BASE_DIR/tmp/homelab/world"
export HOMELAB_ACTIONS_ROOT="$BASE_DIR/tmp/homelab/actions"
EVENT_LOG="/tmp/agent-events.log"
echo "=== Starting Action Queue Validation ==="
# 1. Setup drift scenarios
echo "Setting up drift scenarios..."
bash scripts/supervisor/test_scenarios.sh
# 2. Run supervisor to generate action proposals
echo "Running supervisor..."
python3 scripts/supervisor/supervisor.py
# 3. Check for pending actions
echo "Checking pending actions..."
ls -l "$HOMELAB_ACTIONS_ROOT/pending/"
# Get an action ID from pending
ACTION_FILE=$(ls "$HOMELAB_ACTIONS_ROOT/pending/" | head -n 1)
if [ -z "$ACTION_FILE" ]; then
echo "Error: No pending actions found!"
exit 1
fi
ACTION_ID="${ACTION_FILE%.json}"
echo "Found action: $ACTION_ID"
# 4. Approve the action
echo "Approving action $ACTION_ID..."
python3 scripts/executor/executor.py approve "$ACTION_ID"
# 5. Run executor
echo "Running executor..."
python3 scripts/executor/executor.py run
# 6. Verify completion
if [ -f "$HOMELAB_ACTIONS_ROOT/completed/$ACTION_FILE" ]; then
echo "SUCCESS: Action $ACTION_ID moved to completed."
else
echo "FAILURE: Action $ACTION_ID NOT found in completed."
exit 1
fi
# 7. Test rejection
echo "Testing rejection..."
NEXT_ACTION_FILE=$(ls "$HOMELAB_ACTIONS_ROOT/pending/" | head -n 1)
if [ -n "$NEXT_ACTION_FILE" ]; then
NEXT_ACTION_ID="${NEXT_ACTION_FILE%.json}"
echo "Rejecting action $NEXT_ACTION_ID..."
python3 scripts/executor/executor.py reject "$NEXT_ACTION_ID"
if [ -f "$HOMELAB_ACTIONS_ROOT/rejected/$NEXT_ACTION_FILE" ]; then
echo "SUCCESS: Action $NEXT_ACTION_ID moved to rejected."
else
echo "FAILURE: Action $NEXT_ACTION_ID NOT found in rejected."
exit 1
fi
fi
# 8. Verify events
echo "Verifying events in $EVENT_LOG..."
grep "action_created" "$EVENT_LOG" | tail -n 1
grep "action_approved" "$EVENT_LOG" | tail -n 1
grep "action_started" "$EVENT_LOG" | tail -n 1
grep "action_completed" "$EVENT_LOG" | tail -n 1
grep "action_rejected" "$EVENT_LOG" | tail -n 1
echo "=== Validation Complete ==="

View file

@ -1,363 +0,0 @@
#!/usr/bin/env python3
import os
import sys
import yaml
import json
import time
import glob
import uuid
from pathlib import Path
# Configuration
WORLD_STATE_PATH = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world"))
ACTIONS_ROOT = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
INVENTORY_PATH = Path("hosts")
EVENT_LOG = Path("/tmp/agent-events.log")
CHECKPOINT_FILE = Path("/tmp/supervisor-checkpoint.json")
# Action Queue Layout
ACTION_DIRS = ["pending", "approved", "running", "completed", "failed", "rejected"]
# Reconcile event types
RECONCILE_REQUIRED = "reconcile_required"
RECONCILE_RECOMMENDED = "reconcile_recommended"
RECONCILE_BLOCKED = "reconcile_blocked"
# Runtime summary states
STATE_NOMINAL = "nominal"
STATE_DEGRADED = "degraded"
STATE_UNSTABLE = "unstable"
STATE_RECONCILING = "reconciling"
def ensure_action_dirs():
"""Ensure action queue directories exist."""
for d in ACTION_DIRS:
(ACTIONS_ROOT / d).mkdir(parents=True, exist_ok=True)
def emit_action_proposal(recommendation):
"""Convert recommendation to action proposal and save to pending/."""
ensure_action_dirs()
action_type_map = {
"redeploy": "redeploy_service",
"deploy": "redeploy_service",
"diagnostics": "collect_diagnostics",
"failover_review": "collect_diagnostics",
"review": "collect_diagnostics",
"delayed_deployment": "rerun_deployment_stage"
}
action_type = action_type_map.get(recommendation["action"], "collect_diagnostics")
risk_level_map = {
"redeploy_service": "guarded",
"rerun_healthcheck": "safe",
"rerun_deployment_stage": "guarded",
"collect_diagnostics": "safe"
}
risk_level = risk_level_map.get(action_type, "dangerous")
# Dangerous always requires approval
# Guarded defaults to approval
approval_required = risk_level in ["dangerous", "guarded"]
action_id = str(uuid.uuid4())
action = {
"action_id": action_id,
"created_at": time.time(),
"proposed_by": "supervisor",
"correlation_id": str(uuid.uuid4()), # In a real system, link to drift ID
"node": recommendation["drift"].get("node"),
"service": recommendation["drift"].get("service"),
"action_type": action_type,
"risk_level": risk_level,
"confidence": 0.9, # Default confidence
"approval_required": approval_required,
"autonomous_eligible": False, # No autonomy yet
"status": "pending",
"payload": recommendation["drift"],
"rollback_reference": None
}
file_path = ACTIONS_ROOT / "pending" / f"{action_id}.json"
try:
with open(file_path, "w") as f:
json.dump(action, f, indent=2)
emit_event("action_created", f"Action proposed: {action_type} for {action.get('service') or action.get('node')}", {
"action_id": action_id,
"action_type": action_type,
"node": action.get("node"),
"service": action.get("service")
})
except Exception as e:
print(f"Error emitting action proposal: {e}", file=sys.stderr)
def emit_event(event_type, message, details=None):
"""Emit reconciliation events using existing event system (append-only file)."""
event = {
"type": event_type,
"message": message,
"timestamp": time.time(),
"details": details or {}
}
line = json.dumps(event)
print(line)
try:
# Append-only semantics
with open(EVENT_LOG, "a", encoding="utf-8") as f:
f.write(line + "\n")
f.flush()
except Exception as e:
print(f"Error writing to event log: {e}", file=sys.stderr)
def load_desired_state():
"""Load desired state from hosts/*/services.yaml."""
desired = {"services": [], "nodes": []}
if not INVENTORY_PATH.exists():
return desired
# Inventory model: hosts/{node_name}/services.yaml
for yaml_file in glob.glob(str(INVENTORY_PATH / "*" / "services.yaml")):
try:
with open(yaml_file, "r") as f:
data = yaml.safe_load(f)
if data and "services" in data:
node_name = Path(yaml_file).parent.name
for svc in data["services"]:
if isinstance(svc, str):
svc = {"name": svc}
svc["node"] = node_name
desired["services"].append(svc)
if node_name not in desired["nodes"]:
desired["nodes"].append(node_name)
except Exception as e:
print(f"Error loading {yaml_file}: {e}", file=sys.stderr)
return desired
def load_world_state():
"""Load current world state from /opt/homelab/world/ (filesystem-first)."""
world = {
"services": {},
"nodes": {},
"deployments": {},
"incidents": {}
}
if not WORLD_STATE_PATH.exists():
return world
# Filesystem-first design: each category is a directory, each item is a JSON file
for category in ["services", "nodes", "deployments", "incidents"]:
cat_path = WORLD_STATE_PATH / category
if cat_path.exists() and cat_path.is_dir():
for json_file in cat_path.glob("*.json"):
try:
with open(json_file, "r") as f:
world[category][json_file.stem] = json.load(f)
except Exception as e:
print(f"Error loading {json_file}: {e}", file=sys.stderr)
return world
def detect_drift(desired, world):
"""Compare desired infrastructure state with observed runtime state."""
drifts = []
# 1. Missing service & 2. Unhealthy service
desired_service_names = set()
for d_svc in desired["services"]:
name = d_svc["name"]
desired_service_names.add(name)
a_svc = world["services"].get(name)
if not a_svc:
drifts.append({
"type": "missing_service",
"service": name,
"node": d_svc["node"]
})
elif a_svc.get("status") not in ("ok", "healthy", "up"):
drifts.append({
"type": "unhealthy_service",
"service": name,
"status": a_svc.get("status"),
"node": a_svc.get("node") or d_svc["node"]
})
# 4. Offline node
for node_name in desired["nodes"]:
a_node = world["nodes"].get(node_name)
if not a_node or a_node.get("status") not in ("online", "ok", "up"):
drifts.append({
"type": "offline_node",
"node": node_name,
"status": a_node.get("status") if a_node else "missing"
})
# 3. Failed deployment
# Check for recent failures in world/deployments
for d_id, d_data in world["deployments"].items():
if d_data.get("status") == "failed":
drifts.append({
"type": "failed_deployment",
"deployment_id": d_id,
"service": d_data.get("service")
})
# 5. Unresolved incidents
for i_id, i_data in world["incidents"].items():
if i_data.get("status") not in ("resolved", "closed"):
drifts.append({
"type": "unresolved_incident",
"incident_id": i_id,
"description": i_data.get("description"),
"status": i_data.get("status")
})
return drifts
def recommendation_engine(drifts, world):
"""Recommendation mode only: emit proposed actions without mutation."""
recommendations = []
for drift in drifts:
if drift["type"] == "unhealthy_service":
recommendations.append({
"drift": drift,
"action": "redeploy",
"message": f"Service {drift['service']} is unhealthy. Recommend redeploy.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "failed_deployment":
service = drift["service"]
# Recommendation: repeated deployment failures -> recommend diagnostics
failures = [d for d in world["deployments"].values() if d.get("service") == service and d.get("status") == "failed"]
if len(failures) > 2:
recommendations.append({
"drift": drift,
"action": "diagnostics",
"message": f"Repeated deployment failures for {service}. Recommend diagnostics.",
"type": RECONCILE_BLOCKED
})
else:
recommendations.append({
"drift": drift,
"action": "redeploy",
"message": f"Deployment failed for {service}. Recommend retry.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "offline_node":
# Recommendation: node offline -> recommend failover review
recommendations.append({
"drift": drift,
"action": "failover_review",
"message": f"Node {drift['node']} is offline. Recommend failover review.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "missing_service":
# Recommendation: dependency unavailable -> recommend delayed deployment
# Mock dependency check: if a service is 'webapp' and 'database' is not healthy
dependencies_met = True
if drift["service"] == "webapp":
db_svc = world["services"].get("database")
if not db_svc or db_svc.get("status") not in ("ok", "healthy", "up"):
dependencies_met = False
if not dependencies_met:
recommendations.append({
"drift": drift,
"action": "delayed_deployment",
"message": f"Dependency unavailable for {drift['service']}. Recommend delayed deployment.",
"type": RECONCILE_RECOMMENDED
})
else:
recommendations.append({
"drift": drift,
"action": "deploy",
"message": f"Service {drift['service']} is missing. Recommend deployment.",
"type": RECONCILE_REQUIRED
})
elif drift["type"] == "unresolved_incident":
recommendations.append({
"drift": drift,
"action": "review",
"message": f"Unresolved incident: {drift['description']}. Recommend review.",
"type": RECONCILE_RECOMMENDED
})
return recommendations
def save_checkpoint(state):
"""Checkpoint support for idempotent operation."""
try:
with open(CHECKPOINT_FILE, "w") as f:
json.dump({
"last_run": time.time(),
"state": state
}, f)
except Exception as e:
print(f"Error saving checkpoint: {e}", file=sys.stderr)
def load_checkpoint():
"""Load last run checkpoint."""
if CHECKPOINT_FILE.exists():
try:
with open(CHECKPOINT_FILE, "r") as f:
return json.load(f)
except Exception as e:
print(f"Error loading checkpoint: {e}", file=sys.stderr)
return None
def main():
print(f"--- Supervisor Run: {time.ctime()} ---")
checkpoint = load_checkpoint()
if checkpoint:
print(f"Last run: {time.ctime(checkpoint.get('last_run', 0))}")
# 1. Load desired state
desired = load_desired_state()
# 2. Load world state
world = load_world_state()
# 3. Detect drift
drifts = detect_drift(desired, world)
# 4. Generate recommendations (Recommendation mode only)
recommendations = recommendation_engine(drifts, world)
# 5. Emit events & Update summary state
if not recommendations and not drifts:
emit_event("summary_state", f"System state: {STATE_NOMINAL}", {"state": STATE_NOMINAL})
else:
# Extend runtime summary states: nominal, degraded, unstable, reconciling
has_blocked = any(r["type"] == RECONCILE_BLOCKED for r in recommendations)
has_required = any(r["type"] == RECONCILE_REQUIRED for r in recommendations)
overall_state = STATE_NOMINAL
if has_blocked:
overall_state = STATE_UNSTABLE
elif has_required:
overall_state = STATE_DEGRADED
elif recommendations:
overall_state = STATE_DEGRADED
emit_event("summary_state", f"System state: {overall_state}", {"state": overall_state})
# Emit reconciliation events
for rec in recommendations:
emit_event(rec["type"], rec["message"], rec["drift"])
# Proposed: Emit action proposals to action queue
emit_action_proposal(rec)
# 6. Save checkpoint
save_checkpoint({
"drift_count": len(drifts),
"recommendation_count": len(recommendations),
"timestamp": time.time()
})
print("Run complete.")
if __name__ == "__main__":
main()

View file

@ -1,79 +0,0 @@
#!/bin/bash
# Generate realistic reconciliation drift scenarios for the homelab supervisor.
set -e
# Configuration
BASE_DIR=$(pwd)
# Use local directory for testing if /opt is not writable
WORLD_DIR="${HOMELAB_WORLD_ROOT:-$BASE_DIR/tmp/homelab/world}"
INVENTORY_DIR="$BASE_DIR/hosts"
echo "Setting up homelab reconciliation scenarios..."
echo "World state: $WORLD_DIR"
echo "Inventory: $INVENTORY_DIR"
# Cleanup
rm -rf "$WORLD_DIR"
rm -rf "$INVENTORY_DIR"
# Create directories
mkdir -p "$INVENTORY_DIR/node1"
mkdir -p "$INVENTORY_DIR/node2"
mkdir -p "$WORLD_DIR/services" "$WORLD_DIR/nodes" "$WORLD_DIR/deployments" "$WORLD_DIR/incidents"
# --- Scenario 1: Nominal ---
cat <<EOF > "$INVENTORY_DIR/node1/services.yaml"
services:
- name: homeassistant
EOF
cat <<EOF > "$WORLD_DIR/services/homeassistant.json"
{"name": "homeassistant", "status": "healthy", "node": "node1"}
EOF
cat <<EOF > "$WORLD_DIR/nodes/node1.json"
{"name": "node1", "status": "online"}
EOF
# --- Scenario 2: Unhealthy Service ---
cat <<EOF > "$WORLD_DIR/services/homeassistant.json"
{"name": "homeassistant", "status": "unhealthy", "node": "node1"}
EOF
# --- Scenario 3: Missing Service ---
cat <<EOF > "$INVENTORY_DIR/node2/services.yaml"
services:
- name: webapp
EOF
# webapp is missing from world/services
# --- Scenario 4: Dependency Unavailable (for Missing Service) ---
cat <<EOF > "$WORLD_DIR/services/database.json"
{"name": "database", "status": "error", "node": "node2"}
EOF
# webapp depends on database in supervisor logic
# --- Scenario 5: Offline Node ---
cat <<EOF > "$WORLD_DIR/nodes/node2.json"
{"name": "node2", "status": "offline"}
EOF
# --- Scenario 6: Repeated Deployment Failures ---
cat <<EOF > "$WORLD_DIR/deployments/dep-001.json"
{"id": "dep-001", "service": "webapp", "status": "failed", "timestamp": $(date +%s)}
EOF
cat <<EOF > "$WORLD_DIR/deployments/dep-002.json"
{"id": "dep-002", "service": "webapp", "status": "failed", "timestamp": $(( $(date +%s) - 300 ))}
EOF
cat <<EOF > "$WORLD_DIR/deployments/dep-003.json"
{"id": "dep-003", "service": "webapp", "status": "failed", "timestamp": $(( $(date +%s) - 600 ))}
EOF
# --- Scenario 7: Unresolved Incident ---
cat <<EOF > "$WORLD_DIR/incidents/inc-99.json"
{"id": "inc-99", "description": "High memory usage on node1", "status": "investigating"}
EOF
echo "Scenarios generated successfully."
echo "You can now run: HOMELAB_WORLD_ROOT=$WORLD_DIR python3 scripts/supervisor/supervisor.py"

View file

@ -1,24 +0,0 @@
{
"action_id": "0083f8ad-1f2b-47a4-81a8-81e59740879e",
"created_at": 1778600485.050643,
"proposed_by": "supervisor",
"correlation_id": "6d88755b-ca89-45eb-bf2d-506fca631144",
"node": "node1",
"service": "homeassistant",
"action_type": "redeploy_service",
"risk_level": "guarded",
"confidence": 0.9,
"approval_required": true,
"autonomous_eligible": false,
"status": "completed",
"payload": {
"type": "unhealthy_service",
"service": "homeassistant",
"status": "unhealthy",
"node": "node1"
},
"rollback_reference": null,
"approved_at": 1778600485.1278665,
"started_at": 1778600485.1792338,
"finished_at": 1778600485.6797137
}

View file

@ -1,23 +0,0 @@
{
"action_id": "050add79-3265-4e35-bb88-41c368bbccda",
"created_at": 1778600510.7529757,
"proposed_by": "supervisor",
"correlation_id": "d8ba7d84-74dd-46c8-a085-5ed8ba186770",
"node": null,
"service": "webapp",
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "completed",
"payload": {
"type": "failed_deployment",
"deployment_id": "dep-001",
"service": "webapp"
},
"rollback_reference": null,
"approved_at": 1778600510.8252015,
"started_at": 1778600510.8744874,
"finished_at": 1778600511.3750403
}

View file

@ -1,7 +0,0 @@
{
"action_id": "resumable-task",
"action_type": "rerun_healthcheck",
"status": "completed",
"started_at": 1778600488.5642526,
"finished_at": 1778600489.0646975
}

View file

@ -1,10 +0,0 @@
{"timestamp": 1778600485.1282582, "action_id": "0083f8ad-1f2b-47a4-81a8-81e59740879e", "status": "approved", "message": "Manual approval received"}
{"timestamp": 1778600485.179484, "action_id": "0083f8ad-1f2b-47a4-81a8-81e59740879e", "status": "running", "message": "Execution started (dry_run=False)"}
{"timestamp": 1778600485.680433, "action_id": "0083f8ad-1f2b-47a4-81a8-81e59740879e", "status": "completed", "message": "Execution finished"}
{"timestamp": 1778600485.7410686, "action_id": "2143ae5b-bcc6-410b-b925-e7def70fc013", "status": "rejected", "message": "Manual rejection received"}
{"timestamp": 1778600488.5644836, "action_id": "resumable-task", "status": "running", "message": "Execution started (dry_run=False)"}
{"timestamp": 1778600489.0652084, "action_id": "resumable-task", "status": "completed", "message": "Execution finished"}
{"timestamp": 1778600510.825529, "action_id": "050add79-3265-4e35-bb88-41c368bbccda", "status": "approved", "message": "Manual approval received"}
{"timestamp": 1778600510.8747966, "action_id": "050add79-3265-4e35-bb88-41c368bbccda", "status": "running", "message": "Execution started (dry_run=False)"}
{"timestamp": 1778600511.3755214, "action_id": "050add79-3265-4e35-bb88-41c368bbccda", "status": "completed", "message": "Execution finished"}
{"timestamp": 1778600511.4307747, "action_id": "240cbbc0-891e-4032-bf73-1fa40ff850b4", "status": "rejected", "message": "Manual rejection received"}

View file

@ -1,21 +0,0 @@
{
"action_id": "50d7cdab-2f12-449f-965a-0383e32babaa",
"created_at": 1778600485.053174,
"proposed_by": "supervisor",
"correlation_id": "a2899a7f-548f-455d-a8dd-4e208be58e00",
"node": null,
"service": null,
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "unresolved_incident",
"incident_id": "inc-99",
"description": "High memory usage on node1",
"status": "investigating"
},
"rollback_reference": null
}

View file

@ -1,20 +0,0 @@
{
"action_id": "5e239d96-ff3f-48a3-a71a-ad5aa6b7ff88",
"created_at": 1778600485.05199,
"proposed_by": "supervisor",
"correlation_id": "c5fa628e-35a1-44f9-9119-07d93f20af80",
"node": null,
"service": "webapp",
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "failed_deployment",
"deployment_id": "dep-002",
"service": "webapp"
},
"rollback_reference": null
}

View file

@ -1,20 +0,0 @@
{
"action_id": "7cde5093-3394-43af-9391-321c50ac5362",
"created_at": 1778600510.7521193,
"proposed_by": "supervisor",
"correlation_id": "2a91f58e-e10d-4de5-abd7-5f4fe6fdc325",
"node": null,
"service": "webapp",
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "failed_deployment",
"deployment_id": "dep-002",
"service": "webapp"
},
"rollback_reference": null
}

View file

@ -1,20 +0,0 @@
{
"action_id": "a42e2183-ca22-4a50-97a7-eb53ab0e039a",
"created_at": 1778600510.75163,
"proposed_by": "supervisor",
"correlation_id": "ec2a1960-5baa-453a-8380-65fc9376cc82",
"node": "node2",
"service": null,
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "offline_node",
"node": "node2",
"status": "offline"
},
"rollback_reference": null
}

View file

@ -1,21 +0,0 @@
{
"action_id": "aae83bcd-455f-4b59-bab0-7c7994116468",
"created_at": 1778600510.7506568,
"proposed_by": "supervisor",
"correlation_id": "0a786305-46cb-4837-8725-53d99203f39e",
"node": "node1",
"service": "homeassistant",
"action_type": "redeploy_service",
"risk_level": "guarded",
"confidence": 0.9,
"approval_required": true,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "unhealthy_service",
"service": "homeassistant",
"status": "unhealthy",
"node": "node1"
},
"rollback_reference": null
}

View file

@ -1,21 +0,0 @@
{
"action_id": "c2e6c844-6d96-4ea7-b924-5e33764e5493",
"created_at": 1778600510.7533653,
"proposed_by": "supervisor",
"correlation_id": "6ffc0579-71ac-417f-8ea1-fc46e54527c6",
"node": null,
"service": null,
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "unresolved_incident",
"incident_id": "inc-99",
"description": "High memory usage on node1",
"status": "investigating"
},
"rollback_reference": null
}

View file

@ -1,20 +0,0 @@
{
"action_id": "c91a4171-e636-4194-a146-6e003d2f2586",
"created_at": 1778600510.7511823,
"proposed_by": "supervisor",
"correlation_id": "966a62ee-f81b-497d-96cb-7749f4da0c6f",
"node": "node2",
"service": "webapp",
"action_type": "rerun_deployment_stage",
"risk_level": "guarded",
"confidence": 0.9,
"approval_required": true,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "missing_service",
"service": "webapp",
"node": "node2"
},
"rollback_reference": null
}

View file

@ -1,20 +0,0 @@
{
"action_id": "e6d3f0d6-c294-4282-b9f4-a730f9cec9dc",
"created_at": 1778600485.0515254,
"proposed_by": "supervisor",
"correlation_id": "bf51852b-0b34-4b4b-98c9-fffff38f77ce",
"node": "node2",
"service": null,
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "offline_node",
"node": "node2",
"status": "offline"
},
"rollback_reference": null
}

View file

@ -1,20 +0,0 @@
{
"action_id": "f4c56df2-6775-484b-806e-cdecdcc19584",
"created_at": 1778600485.0527768,
"proposed_by": "supervisor",
"correlation_id": "f974d640-d0fb-4a85-bf8a-eda100182181",
"node": null,
"service": "webapp",
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "failed_deployment",
"deployment_id": "dep-001",
"service": "webapp"
},
"rollback_reference": null
}

View file

@ -1,20 +0,0 @@
{
"action_id": "ff3da03c-fffa-49a7-985d-ed4589ab6856",
"created_at": 1778600485.0510974,
"proposed_by": "supervisor",
"correlation_id": "37da2d5b-3ecd-4a29-97c2-7e9461b1792e",
"node": "node2",
"service": "webapp",
"action_type": "rerun_deployment_stage",
"risk_level": "guarded",
"confidence": 0.9,
"approval_required": true,
"autonomous_eligible": false,
"status": "pending",
"payload": {
"type": "missing_service",
"service": "webapp",
"node": "node2"
},
"rollback_reference": null
}

View file

@ -1,21 +0,0 @@
{
"action_id": "2143ae5b-bcc6-410b-b925-e7def70fc013",
"created_at": 1778600485.0523734,
"proposed_by": "supervisor",
"correlation_id": "dc23556c-68d2-41a3-a5d2-9ad66705f989",
"node": null,
"service": "webapp",
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "rejected",
"payload": {
"type": "failed_deployment",
"deployment_id": "dep-003",
"service": "webapp"
},
"rollback_reference": null,
"rejected_at": 1778600485.740686
}

View file

@ -1,21 +0,0 @@
{
"action_id": "240cbbc0-891e-4032-bf73-1fa40ff850b4",
"created_at": 1778600510.7525399,
"proposed_by": "supervisor",
"correlation_id": "fd234809-82aa-459d-858b-18bc3205a6c5",
"node": null,
"service": "webapp",
"action_type": "collect_diagnostics",
"risk_level": "safe",
"confidence": 0.9,
"approval_required": false,
"autonomous_eligible": false,
"status": "rejected",
"payload": {
"type": "failed_deployment",
"deployment_id": "dep-003",
"service": "webapp"
},
"rollback_reference": null,
"rejected_at": 1778600511.4303465
}

View file

@ -1 +0,0 @@
{"id": "dep-001", "service": "webapp", "status": "failed", "timestamp": 1778600510}

View file

@ -1 +0,0 @@
{"id": "dep-002", "service": "webapp", "status": "failed", "timestamp": 1778600210}

View file

@ -1 +0,0 @@
{"id": "dep-003", "service": "webapp", "status": "failed", "timestamp": 1778599910}

View file

@ -1 +0,0 @@
{"id": "inc-99", "description": "High memory usage on node1", "status": "investigating"}

View file

@ -1 +0,0 @@
{"name": "node1", "status": "online"}

View file

@ -1 +0,0 @@
{"name": "node2", "status": "offline"}

View file

@ -1 +0,0 @@
{"name": "database", "status": "error", "node": "node2"}

View file

@ -1 +0,0 @@
{"name": "homeassistant", "status": "unhealthy", "node": "node1"}

View file

@ -263,6 +263,9 @@
</aside>
<main class="main-content">
<div id="stale-banner" class="hidden" style="background:var(--error); color:white; padding:8px 24px; font-weight:bold; font-size:12px; text-align:center; letter-spacing:0.05em">
RUNTIME STATE IS STALE
</div>
<header>
<div style="display:flex; align-items:center; gap:20px">
<div class="view-title" id="current-view-title">Dashboard</div>
@ -407,9 +410,12 @@
}
}
function setOperatorMode(mode) {
async function setOperatorMode(mode) {
console.log('Operator mode set to:', mode);
// In real system, this would call backend
const res = await postData('/mode', {mode});
if (res && res.status === 'ok') {
console.log('Mode updated successfully');
}
}
function formatTime(ts) {
@ -435,6 +441,15 @@
statusEl.textContent = `System Status: ${summary.status.toUpperCase()}`;
statusEl.className = 'sidebar-footer ' + getStatusClass(summary.status);
// Handle stale state
const staleBanner = document.getElementById('stale-banner');
if (summary.stale) {
staleBanner.classList.remove('hidden');
staleBanner.textContent = `CRITICAL: Runtime state is STALE (Last update: ${formatTime(summary.last_update)})`;
} else {
staleBanner.classList.add('hidden');
}
if (currentView === 'dashboard') {
const dashSummary = document.getElementById('dashboard-summary');
dashSummary.innerHTML = `
@ -463,12 +478,12 @@
pendingEl.innerHTML = actions.pending.map(a => `
<div class="card" style="margin-bottom:12px">
<div class="card-header">
<div class="card-title">${a.type.toUpperCase()}</div>
<div class="card-title">${(a.action_type || a.type || 'unknown').toUpperCase()}</div>
<span class="badge risk-${a.risk_level}">${a.risk_level}</span>
</div>
<p>${a.description}</p>
<div class="label">Target</div><div class="value">${a.target.node} ${a.target.service || ''}</div>
<div class="label">Confidence</div><div class="value">${Math.round(a.confidence*100)}%</div>
<p>${a.description || a.action_type || 'No description'}</p>
<div class="label">Target</div><div class="value">${a.node || (a.target && a.target.node) || 'unknown'} ${(a.service || (a.target && a.target.service)) || ''}</div>
<div class="label">Confidence</div><div class="value">${Math.round((a.confidence || 0)*100)}%</div>
<div class="controls">
<button class="btn-primary" onclick="mutateAction('${a.id}', 'approved')">Approve</button>
<button onclick="mutateAction('${a.id}', 'rejected')">Reject</button>
@ -476,16 +491,21 @@
</div>
`).join('') || 'No pending actions.';
const history = [...actions.approved, ...actions.running, ...actions.completed, ...actions.failed];
historyEl.innerHTML = history.sort((a,b) => b.timestamp - a.timestamp).map(a => `
const history = [...actions.approved, ...actions.running, ...actions.completed, ...actions.failed, ...actions.rejected];
historyEl.innerHTML = history.sort((a,b) => (b.timestamp || b.updated_at || 0) - (a.timestamp || a.updated_at || 0)).map(a => `
<div class="event">
<div class="event-header">
<span>${a.type.toUpperCase()}</span>
<span>${(a.action_type || a.type || 'unknown').toUpperCase()}</span>
<span class="badge ${getStatusClass(a.status)}">${a.status}</span>
</div>
<div>${a.description}</div>
<small>${formatTime(a.timestamp)} | Target: ${a.target.node}</small>
<div>${a.description || a.action_type || 'No description'}</div>
<small>${formatTime(a.timestamp || a.updated_at)} | Target: ${a.node || (a.target && a.target.node)}</small>
${a.status === 'approved' ? `<div class="controls"><button class="btn-primary" onclick="mutateAction('${a.id}', 'running')">Execute</button></div>` : ''}
${a.transition_history ? `
<div style="margin-top:8px; font-size:10px; color:var(--text-muted)">
<strong>Trace:</strong> ${a.transition_history.map(h => `${h.from}->${h.to}`).join(' → ')}
</div>
` : ''}
</div>
`).join('') || 'No history.';
}

View file

@ -1,19 +1,20 @@
import json
import os
import socket
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
STATE_DIR = Path("/opt/homelab/state")
EVENTS_DIR = Path("/opt/homelab/events")
WORLD_DIR = Path("/opt/homelab/world")
ACTIONS_DIR = Path("/opt/homelab/actions")
EVENT_LOG = Path("/tmp/agent-events.log")
STATE_DIR = Path(os.getenv("HOMELAB_STATE_ROOT", "/opt/homelab/state"))
EVENTS_DIR = Path(os.getenv("HOMELAB_EVENTS_ROOT", "/opt/homelab/events"))
WORLD_DIR = Path(os.getenv("HOMELAB_WORLD_ROOT", "/opt/homelab/world"))
ACTIONS_DIR = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
CONFIG_DIR = Path(os.getenv("HOMELAB_CONFIG_ROOT", "/opt/homelab/config"))
STATIC_DIR = Path(__file__).parent
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
DEFAULT_CONFIG = {
"operator_mode": "approval",
"auto_mode": True,
"action_thresholds": {
"restart_ha": 0.8,
@ -24,87 +25,6 @@ DEFAULT_CONFIG = {
}
def tail_lines(path, limit):
if not path.exists():
path.touch()
with path.open("r", encoding="utf-8", errors="replace") as handle:
lines = handle.readlines()
print(f"Read {len(lines)} lines", flush=True)
return [line.rstrip("\n") for line in lines[-limit:]]
def redis_command(*parts):
payload = f"*{len(parts)}\r\n".encode("utf-8")
for part in parts:
data = str(part).encode("utf-8")
payload += f"${len(data)}\r\n".encode("utf-8") + data + b"\r\n"
with socket.create_connection((REDIS_HOST, REDIS_PORT), timeout=3) as client:
client.sendall(payload)
return client.recv(4096)
def send_command(command):
task = {
"target": "orchestrator",
"action": "input",
"params": {
"command": command,
},
}
redis_command("LPUSH", "tasks", json.dumps(task))
def send_stop_run(run_id):
task = {
"target": "orchestrator",
"action": "stop_run",
"params": {
"run_id": run_id,
},
}
redis_command("LPUSH", "tasks", json.dumps(task))
def send_run_action(run_id, command):
task = {
"target": "orchestrator",
"action": "run_action",
"params": {
"run_id": run_id,
"command": command,
},
}
redis_command("LPUSH", "tasks", json.dumps(task))
def send_auto_mode(auto_mode):
task = {
"target": "orchestrator",
"action": "set_auto_mode",
"params": {
"auto_mode": auto_mode,
},
}
redis_command("LPUSH", "tasks", json.dumps(task))
def send_auto_config(config):
task = {
"target": "orchestrator",
"action": "set_auto_config",
"params": {
"config": config,
},
}
redis_command("LPUSH", "tasks", json.dumps(task))
def send_event(event):
redis_command("LPUSH", "events", json.dumps(event))
def read_json_file(path, default=None):
if not path.exists():
return default if default is not None else []
@ -114,21 +34,16 @@ def read_json_file(path, default=None):
return default if default is not None else []
def current_config():
config = json.loads(json.dumps(DEFAULT_CONFIG))
# We still keep reading from EVENT_LOG for auto_config if it's there
for line in tail_lines(EVENT_LOG, 500):
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
if event.get("type") != "auto_config":
continue
def get_config():
config_path = STATE_DIR / "operator-config.json"
if config_path.exists():
return read_json_file(config_path, DEFAULT_CONFIG)
return DEFAULT_CONFIG
for key in config:
if key in event:
config[key] = event[key]
return config
def save_config(config):
STATE_DIR.mkdir(parents=True, exist_ok=True)
(STATE_DIR / "operator-config.json").write_text(json.dumps(config, indent=2))
def current_nodes():
@ -152,7 +67,13 @@ def current_recommendations():
def current_summary():
return read_json_file(STATE_DIR / "runtime-summary.json", default={})
summary = read_json_file(STATE_DIR / "runtime-summary.json", default={})
if summary:
# Check for staleness
mtime = os.path.getmtime(STATE_DIR / "runtime-summary.json")
summary["last_update"] = mtime
summary["stale"] = (time.time() - mtime) > 60 # Stale if older than 60s
return summary
def current_events():
@ -161,6 +82,8 @@ def current_events():
for f in EVENTS_DIR.glob("*.json"):
data = read_json_file(f)
if data:
# Add source file for traceability
data["_source"] = f.name
events.append(data)
return sorted(events, key=lambda x: x.get("timestamp", 0), reverse=True)
@ -175,6 +98,9 @@ def current_actions():
for f in status_dir.glob("*.json"):
data = read_json_file(f)
if data:
# Injects some metadata for UI
data["id"] = data.get("action_id") or f.stem
data["status"] = status
actions[status].append(data)
return actions
@ -186,10 +112,12 @@ def mutate_action(action_id, target_status):
# Find where the action is
source_path = None
current_status = None
for status in statuses:
p = ACTIONS_DIR / status / f"{action_id}.json"
if p.exists():
source_path = p
current_status = status
break
if not source_path:
@ -202,10 +130,17 @@ def mutate_action(action_id, target_status):
try:
data = json.loads(source_path.read_text())
data["status"] = target_status
data["last_mutation"] = os.path.getmtime(source_path) # or current time
import time
data["last_mutation"] = time.time()
data["updated_at"] = time.time()
# Keep history of transitions
history = data.get("transition_history", [])
history.append({
"from": current_status,
"to": target_status,
"timestamp": time.time()
})
data["transition_history"] = history
target_path.write_text(json.dumps(data, indent=2))
if source_path != target_path:
source_path.unlink()
@ -226,7 +161,7 @@ def send_json(status, payload, handler):
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/config":
send_json(200, current_config(), self)
send_json(200, get_config(), self)
return
if self.path == "/nodes":
@ -261,16 +196,6 @@ class Handler(BaseHTTPRequestHandler):
send_json(200, current_actions(), self)
return
if self.path == "/logs":
print("LOGS endpoint called", flush=True)
body = ("\n".join(tail_lines(EVENT_LOG, 200)) + "\n").encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return
if self.path in ("/", "/index.html"):
body = (STATIC_DIR / "index.html").read_bytes()
self.send_response(200)
@ -284,13 +209,9 @@ class Handler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path not in (
"/command",
"/stop",
"/action",
"/auto-mode",
"/config",
"/events",
"/action/mutate",
"/mode",
):
self.send_error(404)
return
@ -299,51 +220,26 @@ class Handler(BaseHTTPRequestHandler):
raw_body = self.rfile.read(length).decode("utf-8")
try:
payload = json.loads(raw_body)
command = str(payload.get("command", "")).strip()
except json.JSONDecodeError:
payload = {}
command = raw_body.strip()
if self.path == "/events":
if not isinstance(payload, dict):
self.send_error(400, "event object is required")
return
send_event(payload)
send_json(200, {"status": "sent"}, self)
return
if self.path == "/stop":
run_id = str(payload.get("run_id", "")).strip()
if not run_id:
self.send_error(400, "run_id is required")
return
send_stop_run(run_id)
send_json(200, {"status": "sent"}, self)
return
if self.path == "/action":
run_id = str(payload.get("run_id", "")).strip()
action_command = str(payload.get("command", "")).strip()
if not run_id:
self.send_error(400, "run_id is required")
return
if not action_command:
self.send_error(400, "command is required")
return
send_run_action(run_id, action_command)
send_json(200, {"status": "sent"}, self)
return
if self.path == "/auto-mode":
send_auto_mode(bool(payload.get("auto_mode")))
send_json(200, {"status": "sent"}, self)
self.send_error(400, "Invalid JSON")
return
if self.path == "/config":
send_auto_config(payload)
send_json(200, {"status": "sent"}, self)
config = get_config()
config.update(payload)
save_config(config)
send_json(200, {"status": "ok"}, self)
return
if self.path == "/mode":
mode = payload.get("mode")
if not mode:
self.send_error(400, "mode is required")
return
config = get_config()
config["operator_mode"] = mode
save_config(config)
send_json(200, {"status": "ok"}, self)
return
if self.path == "/action/mutate":
@ -359,17 +255,18 @@ class Handler(BaseHTTPRequestHandler):
self.send_error(500, msg)
return
if not command:
self.send_error(400, "command is required")
return
send_command(command)
send_json(200, {"status": "sent"}, self)
def log_message(self, format, *args):
return
if __name__ == "__main__":
server = ThreadingHTTPServer(("0.0.0.0", 8080), Handler)
# Ensure directories exist
for d in [STATE_DIR, EVENTS_DIR, WORLD_DIR, ACTIONS_DIR, CONFIG_DIR]:
d.mkdir(parents=True, exist_ok=True)
for s in ["pending", "approved", "running", "completed", "failed", "rejected"]:
(ACTIONS_DIR / s).mkdir(parents=True, exist_ok=True)
port = int(os.getenv("PORT", "8080"))
print(f"Operator Control Plane starting on 0.0.0.0:{port}")
server = ThreadingHTTPServer(("0.0.0.0", port), Handler)
server.serve_forever()