homelab-codex-ws/services/control-plane/src/executor.py

103 lines
3.4 KiB
Python

import os
import json
import time
import logging
import subprocess
from pathlib import Path
# Constants and Paths
RUNTIME_PATH = os.getenv("RUNTIME_PATH", "/opt/homelab")
ACTIONS_DIR = Path(RUNTIME_PATH) / "actions"
REPO_ROOT = Path(os.getenv("REPO_ROOT", "/repo"))
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("executor")
class Executor:
def __init__(self):
self._ensure_dirs()
def _ensure_dirs(self):
for s in ["approved", "running", "completed", "failed", "rejected"]:
(ACTIONS_DIR / s).mkdir(parents=True, exist_ok=True)
def process_actions(self):
approved_dir = ACTIONS_DIR / "approved"
action_files = sorted(approved_dir.glob("*.json"))
for action_file in action_files:
self._execute_action(action_file)
def _execute_action(self, action_file):
action_id = action_file.stem
logger.info(f"Executing action: {action_id}")
# Move to running
running_path = ACTIONS_DIR / "running" / f"{action_id}.json"
try:
with open(action_file, "r") as f:
data = json.load(f)
data["status"] = "running"
data["started_at"] = time.time()
with open(running_path, "w") as f:
json.dump(data, f, indent=2)
action_file.unlink()
except Exception as e:
logger.error(f"Failed to move {action_id} to running: {e}")
return
# Execute
success = False
error_msg = ""
try:
action_type = data.get("type")
node = data.get("node")
service = data.get("service")
if action_type == "redeploy":
# Call deploy-node.sh
cmd = [
str(REPO_ROOT / "scripts" / "deploy" / "deploy-node.sh"),
node,
service
]
logger.info(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, cwd=str(REPO_ROOT))
if result.returncode == 0:
success = True
else:
success = False
error_msg = result.stderr or result.stdout
else:
success = False
error_msg = f"Unknown action type: {action_type}"
except Exception as e:
success = False
error_msg = str(e)
# Move to completed/failed
target_status = "completed" if success else "failed"
target_path = ACTIONS_DIR / target_status / f"{action_id}.json"
try:
data["status"] = target_status
data["finished_at"] = time.time()
if not success:
data["error"] = error_msg
with open(target_path, "w") as f:
json.dump(data, f, indent=2)
running_path.unlink()
logger.info(f"Action {action_id} {target_status}")
except Exception as e:
logger.error(f"Failed to move {action_id} to {target_status}: {e}")
def loop(self, interval=10):
logger.info("Starting executor loop")
while True:
self.process_actions()
time.sleep(interval)
if __name__ == "__main__":
executor = Executor()
executor.loop()