agent-system/scripts/executor/executor.py

226 lines
7.5 KiB
Python
Raw Normal View History

2026-05-12 18:01:37 +02:00
#!/usr/bin/env python3
import os
import json
import time
import sys
import shutil
import uuid
from pathlib import Path
# Configuration
ACTIONS_ROOT = Path(os.getenv("HOMELAB_ACTIONS_ROOT", "/opt/homelab/actions"))
EVENT_LOG = Path("/tmp/agent-events.log")
HISTORY_LOG = ACTIONS_ROOT / "history.log"
def emit_event(event_type, message, details=None):
"""Emit action lifecycle events."""
event = {
"type": event_type,
"message": message,
"timestamp": time.time(),
"details": details or {}
}
line = json.dumps(event)
print(line)
try:
with open(EVENT_LOG, "a") as f:
f.write(line + "\n")
f.flush()
except Exception as e:
print(f"Error writing to event log: {e}", file=sys.stderr)
def log_history(action_id, status, message):
"""Append-only execution history."""
entry = {
"timestamp": time.time(),
"action_id": action_id,
"status": status,
"message": message
}
try:
with open(HISTORY_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
f.flush()
except Exception as e:
print(f"Error writing history: {e}", file=sys.stderr)
def ensure_dirs():
for d in ["pending", "approved", "running", "completed", "failed", "rejected"]:
(ACTIONS_ROOT / d).mkdir(parents=True, exist_ok=True)
def approve_action(action_id):
ensure_dirs()
if not action_id.endswith(".json"):
filename = f"{action_id}.json"
else:
filename = action_id
pending_path = ACTIONS_ROOT / "pending" / filename
if not pending_path.exists():
print(f"Action {filename} not found in pending.")
return False
approved_path = ACTIONS_ROOT / "approved" / filename
try:
with open(pending_path, "r") as f:
action = json.load(f)
action["status"] = "approved"
action["approved_at"] = time.time()
with open(pending_path, "w") as f:
json.dump(action, f, indent=2)
shutil.move(pending_path, approved_path)
emit_event("action_approved", f"Action approved: {action['action_id']}", {"action_id": action['action_id']})
log_history(action['action_id'], "approved", "Manual approval received")
print(f"Action {action['action_id']} approved.")
return True
except Exception as e:
print(f"Error approving action: {e}")
return False
def reject_action(action_id):
ensure_dirs()
if not action_id.endswith(".json"):
filename = f"{action_id}.json"
else:
filename = action_id
pending_path = ACTIONS_ROOT / "pending" / filename
if not pending_path.exists():
print(f"Action {filename} not found in pending.")
return False
rejected_path = ACTIONS_ROOT / "rejected" / filename
try:
with open(pending_path, "r") as f:
action = json.load(f)
action["status"] = "rejected"
action["rejected_at"] = time.time()
with open(pending_path, "w") as f:
json.dump(action, f, indent=2)
shutil.move(pending_path, rejected_path)
emit_event("action_rejected", f"Action rejected: {action['action_id']}", {"action_id": action['action_id']})
log_history(action['action_id'], "rejected", "Manual rejection received")
print(f"Action {action['action_id']} rejected.")
return True
except Exception as e:
print(f"Error rejecting action: {e}")
return False
def process_action(action_path, dry_run=False):
"""Process a single approved action."""
try:
with open(action_path, "r") as f:
action = json.load(f)
except Exception as e:
print(f"Error reading action {action_path}: {e}")
return
action_id = action["action_id"]
action_type = action["action_type"]
# Move to running (Resumable execution state)
running_path = ACTIONS_ROOT / "running" / action_path.name
shutil.move(action_path, running_path)
action["status"] = "running"
action["started_at"] = time.time()
with open(running_path, "w") as f:
json.dump(action, f, indent=2)
emit_event("action_started", f"Started action {action_id} ({action_type})", {"action_id": action_id})
log_history(action_id, "running", f"Execution started (dry_run={dry_run})")
# Simulation logic (Recommendation-safe execution model)
print(f"Executing {action_type} for {action.get('service') or action.get('node')}...")
# Idempotent simulation: in a real world, we'd check if it's already done
time.sleep(0.5)
success = True
if dry_run:
print(f"[DRY-RUN] Would execute {action_type} logic here.")
else:
# Initial action types implementation (Simulation)
if action_type == "redeploy_service":
print(f"DEBUG: Triggering container restart/redeploy for {action.get('service')}")
elif action_type == "rerun_healthcheck":
print(f"DEBUG: Running healthcheck for {action.get('service')}")
elif action_type == "rerun_deployment_stage":
print(f"DEBUG: Retrying deployment stage for {action.get('service')}")
elif action_type == "collect_diagnostics":
print(f"DEBUG: Collecting logs and metrics for {action.get('service') or action.get('node')}")
else:
print(f"DEBUG: Executing unknown action type: {action_type}")
# Finalize
if success:
final_status = "completed"
target_dir = ACTIONS_ROOT / "completed"
else:
final_status = "failed"
target_dir = ACTIONS_ROOT / "failed"
final_path = target_dir / action_path.name
action["status"] = final_status
action["finished_at"] = time.time()
with open(running_path, "w") as f:
json.dump(action, f, indent=2)
shutil.move(running_path, final_path)
emit_event(f"action_{final_status}", f"Action {action_id} {final_status}", {"action_id": action_id})
log_history(action_id, final_status, "Execution finished")
def run_executor(dry_run=False):
ensure_dirs()
print(f"--- Executor Run: {time.ctime()} (dry_run={dry_run}) ---")
# 1. Resume running actions
running_actions = list((ACTIONS_ROOT / "running").glob("*.json"))
for action_file in running_actions:
print(f"Resuming action: {action_file.name}")
process_action(action_file, dry_run=dry_run)
# 2. Process approved actions
approved_actions = list((ACTIONS_ROOT / "approved").glob("*.json"))
if not approved_actions:
print("No approved actions found.")
else:
for action_file in approved_actions:
process_action(action_file, dry_run=dry_run)
print("Run complete.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Homelab Action Executor")
parser.add_argument("command", choices=["run", "approve", "reject"], nargs="?", default="run")
parser.add_argument("action_id", nargs="?")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if args.command == "run":
run_executor(dry_run=args.dry_run)
elif args.command == "approve":
if not args.action_id:
print("Error: action_id required for approve")
sys.exit(1)
approve_action(args.action_id)
elif args.command == "reject":
if not args.action_id:
print("Error: action_id required for reject")
sys.exit(1)
reject_action(args.action_id)