import json import os import sys import threading import time from uuid import uuid4 from diagnosis import DiagnosisEngine from events import emit_event, emit_run_progress from redis_client import get_redis_client from result_listener import listen_for_results from task_builder import build_task def send_task(redis_client, task, runs=None, run_id=None): if run_id: task["run_id"] = run_id if runs is not None and run_id in runs: runs[run_id]["expected"] += 1 emit_run_progress(run_id, runs[run_id]) redis_client.lpush("tasks", json.dumps(task)) print(f"sent {task['action']} to {task['target']} ({task['task_id']})") emit_event( { "type": "task", "message": f"sent {task['action']} to {task['target']}", "run_id": task.get("run_id"), "node": task.get("target"), } ) def stop_run(runs, run_id): run = runs.get(run_id) if not run or not run.get("active"): return False run["active"] = False run["status"] = "stopped" emit_event( { "type": "run_status", "message": "run status: stopped", "run_id": run_id, "node": None, "status": "stopped", } ) runs.pop(run_id, None) return True def apply_run_action(redis_client, run_actions, run_id, command): context = run_actions.get(run_id, {}) if command == "ignore": emit_event( { "type": "action", "message": "proposal ignored", "run_id": run_id, "node": None, } ) return True if command == "check_network": emit_event( { "type": "action", "message": "confirmed action: check network connectivity", "run_id": run_id, "node": "vps", } ) send_task( redis_client, build_task("vps", "exec", {"cmd": "ping -c 1 1.1.1.1"}), run_id=run_id, ) return True if command == "restart_ha": container_name = context.get("ha_container_name") or "homeassistant" emit_event( { "type": "action", "message": f"confirmed action: restart {container_name}", "run_id": run_id, "node": "piha", } ) send_task( redis_client, build_task("piha", "exec", {"cmd": f"docker restart {container_name}"}), run_id=run_id, ) return True emit_event( { "type": "action", "message": f"unknown proposal command: {command}", "run_id": run_id, "node": None, } ) return False def emit_auto_config(auto_config, message=None): emit_event( { "type": "auto_config", "message": message or "auto config updated", "run_id": None, "node": None, "auto_mode": auto_config.get("auto_mode"), "action_thresholds": auto_config.get("action_thresholds", {}), "default_threshold": auto_config.get("default_threshold"), "allowed_auto_actions": auto_config.get("allowed_auto_actions", []), "max_retries_per_action": auto_config.get("max_retries_per_action", {}), "retry_window_seconds": auto_config.get("retry_window_seconds"), } ) def set_auto_config(auto_config, config): if "auto_mode" in config: auto_config["auto_mode"] = bool(config["auto_mode"]) if isinstance(config.get("action_thresholds"), dict): auto_config["action_thresholds"] = { str(command): float(threshold) for command, threshold in config["action_thresholds"].items() } if "default_threshold" in config: auto_config["default_threshold"] = float(config["default_threshold"]) if isinstance(config.get("allowed_auto_actions"), list): auto_config["allowed_auto_actions"] = [ str(command) for command in config["allowed_auto_actions"] ] emit_auto_config(auto_config) def set_auto_mode(auto_config, enabled): set_auto_config(auto_config, {"auto_mode": enabled}) def dispatch(redis_client, command, ha_task_ids, runs): if command == "ha": run_id = str(uuid4()) runs[run_id] = { "expected": 0, "received": 0, "engine": DiagnosisEngine(), "active": True, "status": "running", } emit_event( { "type": "run_status", "message": "run status: running", "run_id": run_id, "node": None, "status": "running", } ) emit_run_progress(run_id, runs[run_id]) task = build_task("piha", "docker_ps", {}) ha_task_ids[task["task_id"]] = run_id send_task(redis_client, task, runs, run_id) elif command == "stop": for run_id, run in list(runs.items()): if not run.get("active"): continue stop_run(runs, run_id) elif command == "all": send_task(redis_client, build_task("piha", "docker_ps", {})) send_task(redis_client, build_task("vps", "docker_ps", {})) else: send_task(redis_client, build_task("piha", "docker_ps", {})) def process_input(command, redis_client, ha_task_ids, runs): if not command: return print(f"[input] {command}") emit_event( { "type": "log", "message": f"user input: {command}", "run_id": None, "node": None, } ) dispatch(redis_client, command, ha_task_ids, runs) def listen_for_input_tasks(redis_client, ha_task_ids, runs, run_actions, auto_config): while True: _, raw_task = redis_client.brpop("tasks") try: task = json.loads(raw_task) except json.JSONDecodeError: continue if task.get("target") != "orchestrator": redis_client.lpush("tasks", raw_task) time.sleep(1) continue action = task.get("action") if action == "stop_run": params = task.get("params") or {} stop_run(runs, str(params.get("run_id", "")).strip()) continue if action == "run_action": params = task.get("params") or {} apply_run_action( redis_client, run_actions, str(params.get("run_id", "")).strip(), str(params.get("command", "")).strip(), ) continue if action == "set_auto_mode": params = task.get("params") or {} set_auto_mode(auto_config, bool(params.get("auto_mode"))) continue if action == "set_auto_config": params = task.get("params") or {} set_auto_config(auto_config, params.get("config") or {}) continue if action != "input": continue params = task.get("params") or {} process_input(str(params.get("command", "")).strip(), redis_client, ha_task_ids, runs) def services_list(services): return [ { "name": name, "status": state["status"], "last_check": state["last_check"], "node": state.get("node"), "history": state["history"], } for name, state in sorted(services.items()) ] def update_service_health(services, event): service_name = event.get("service") status = event.get("status") node = event.get("node") timestamp = event.get("timestamp", time.time()) if not service_name or status not in ("ok", "error"): return state = services.setdefault( service_name, { "status": status, "last_check": timestamp, "node": node, "history": [], }, ) state["status"] = status state["last_check"] = timestamp state["node"] = node state["history"].append({"status": status, "timestamp": timestamp, "node": node}) state["history"] = state["history"][-20:] emit_event( { "type": "services_state", "message": "services health updated", "run_id": None, "node": None, "services": services_list(services), } ) def listen_for_external_events(redis_client, services): while True: try: _, raw_event = redis_client.brpop("events") event = json.loads(raw_event) except json.JSONDecodeError: emit_event( { "type": "log", "message": f"invalid external event: {raw_event}", "run_id": None, "node": None, } ) continue emit_event(event) if event.get("type") == "health": update_service_health(services, event) def main(): if not os.path.exists("/tmp/agent-events.log"): open("/tmp/agent-events.log", "w").close() redis_client = get_redis_client() ha_task_ids = {} runs = {} last_actions = {} run_actions = {} services = {} auto_config = { "auto_mode": True, "action_thresholds": { "restart_ha": 0.8, "check_network": 0.9, }, "default_threshold": 0.9, "allowed_auto_actions": ["restart_ha"], "max_retries_per_action": { "restart_ha": 3, }, "retry_window_seconds": 300, "action_history": {}, } listener = threading.Thread( target=listen_for_results, args=(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config), daemon=True, ) listener.start() input_listener = threading.Thread( target=listen_for_input_tasks, args=(redis_client, ha_task_ids, runs, run_actions, auto_config), daemon=True, ) input_listener.start() event_listener = threading.Thread( target=listen_for_external_events, args=(redis_client, services), daemon=True, ) event_listener.start() print("[orchestrator] ready") emit_event( { "type": "log", "message": "orchestrator ready", "run_id": None, "node": None, } ) emit_auto_config(auto_config, "auto mode: on") while True: try: command = input("> ").strip() except EOFError: if not sys.stdin.isatty(): print("[orchestrator] stdin closed, exiting") emit_event( { "type": "log", "message": "stdin closed, exiting", "run_id": None, "node": None, } ) return print("[orchestrator] stdin closed, waiting...") emit_event( { "type": "log", "message": "stdin closed, waiting", "run_id": None, "node": None, } ) time.sleep(2) continue except KeyboardInterrupt: print() continue process_input(command, redis_client, ha_task_ids, runs) if __name__ == "__main__": main()