agent-system/orchestrator/main.py

409 lines
11 KiB
Python
Raw Permalink Normal View History

2026-04-30 21:15:28 +02:00
import json
import os
import sys
2026-04-30 21:15:28 +02:00
import threading
2026-04-30 21:20:31 +02:00
import time
from uuid import uuid4
2026-04-30 21:15:28 +02:00
from diagnosis import DiagnosisEngine
from events import emit_event, emit_run_progress
2026-04-30 21:15:28 +02:00
from redis_client import get_redis_client
from result_listener import listen_for_results
from task_builder import build_task
def send_task(redis_client, task, runs=None, run_id=None):
if run_id:
task["run_id"] = run_id
if runs is not None and run_id in runs:
runs[run_id]["expected"] += 1
emit_run_progress(run_id, runs[run_id])
2026-04-30 21:15:28 +02:00
redis_client.lpush("tasks", json.dumps(task))
print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
emit_event(
{
"type": "task",
"message": f"sent {task['action']} to {task['target']}",
"run_id": task.get("run_id"),
"node": task.get("target"),
}
)
def stop_run(runs, run_id):
run = runs.get(run_id)
if not run or not run.get("active"):
return False
run["active"] = False
run["status"] = "stopped"
emit_event(
{
"type": "run_status",
"message": "run status: stopped",
"run_id": run_id,
"node": None,
"status": "stopped",
}
)
runs.pop(run_id, None)
return True
def apply_run_action(redis_client, run_actions, run_id, command):
context = run_actions.get(run_id, {})
if command == "ignore":
emit_event(
{
"type": "action",
"message": "proposal ignored",
"run_id": run_id,
"node": None,
}
)
return True
if command == "check_network":
emit_event(
{
"type": "action",
"message": "confirmed action: check network connectivity",
"run_id": run_id,
"node": "vps",
}
)
send_task(
redis_client,
build_task("vps", "exec", {"cmd": "ping -c 1 1.1.1.1"}),
run_id=run_id,
)
return True
if command == "restart_ha":
container_name = context.get("ha_container_name") or "homeassistant"
emit_event(
{
"type": "action",
"message": f"confirmed action: restart {container_name}",
"run_id": run_id,
"node": "piha",
}
)
send_task(
redis_client,
build_task("piha", "exec", {"cmd": f"docker restart {container_name}"}),
run_id=run_id,
)
return True
emit_event(
{
"type": "action",
"message": f"unknown proposal command: {command}",
"run_id": run_id,
"node": None,
}
)
return False
def emit_auto_config(auto_config, message=None):
emit_event(
{
"type": "auto_config",
"message": message or "auto config updated",
"run_id": None,
"node": None,
"auto_mode": auto_config.get("auto_mode"),
"action_thresholds": auto_config.get("action_thresholds", {}),
"default_threshold": auto_config.get("default_threshold"),
"allowed_auto_actions": auto_config.get("allowed_auto_actions", []),
"max_retries_per_action": auto_config.get("max_retries_per_action", {}),
"retry_window_seconds": auto_config.get("retry_window_seconds"),
}
)
def set_auto_config(auto_config, config):
if "auto_mode" in config:
auto_config["auto_mode"] = bool(config["auto_mode"])
if isinstance(config.get("action_thresholds"), dict):
auto_config["action_thresholds"] = {
str(command): float(threshold)
for command, threshold in config["action_thresholds"].items()
}
if "default_threshold" in config:
auto_config["default_threshold"] = float(config["default_threshold"])
if isinstance(config.get("allowed_auto_actions"), list):
auto_config["allowed_auto_actions"] = [
str(command)
for command in config["allowed_auto_actions"]
]
2026-04-30 21:15:28 +02:00
emit_auto_config(auto_config)
2026-04-30 21:15:28 +02:00
def set_auto_mode(auto_config, enabled):
set_auto_config(auto_config, {"auto_mode": enabled})
def dispatch(redis_client, command, ha_task_ids, runs):
2026-04-30 21:15:28 +02:00
if command == "ha":
run_id = str(uuid4())
runs[run_id] = {
"expected": 0,
"received": 0,
"engine": DiagnosisEngine(),
"active": True,
"status": "running",
}
emit_event(
{
"type": "run_status",
"message": "run status: running",
"run_id": run_id,
"node": None,
"status": "running",
}
)
emit_run_progress(run_id, runs[run_id])
2026-04-30 21:15:28 +02:00
task = build_task("piha", "docker_ps", {})
ha_task_ids[task["task_id"]] = run_id
send_task(redis_client, task, runs, run_id)
elif command == "stop":
for run_id, run in list(runs.items()):
if not run.get("active"):
continue
stop_run(runs, run_id)
2026-04-30 21:15:28 +02:00
elif command == "all":
send_task(redis_client, build_task("piha", "docker_ps", {}))
send_task(redis_client, build_task("vps", "docker_ps", {}))
else:
send_task(redis_client, build_task("piha", "docker_ps", {}))
def process_input(command, redis_client, ha_task_ids, runs):
if not command:
return
print(f"[input] {command}")
emit_event(
{
"type": "log",
"message": f"user input: {command}",
"run_id": None,
"node": None,
}
)
dispatch(redis_client, command, ha_task_ids, runs)
def listen_for_input_tasks(redis_client, ha_task_ids, runs, run_actions, auto_config):
while True:
_, raw_task = redis_client.brpop("tasks")
try:
task = json.loads(raw_task)
except json.JSONDecodeError:
continue
if task.get("target") != "orchestrator":
redis_client.lpush("tasks", raw_task)
time.sleep(1)
continue
action = task.get("action")
if action == "stop_run":
params = task.get("params") or {}
stop_run(runs, str(params.get("run_id", "")).strip())
continue
if action == "run_action":
params = task.get("params") or {}
apply_run_action(
redis_client,
run_actions,
str(params.get("run_id", "")).strip(),
str(params.get("command", "")).strip(),
)
continue
if action == "set_auto_mode":
params = task.get("params") or {}
set_auto_mode(auto_config, bool(params.get("auto_mode")))
continue
if action == "set_auto_config":
params = task.get("params") or {}
set_auto_config(auto_config, params.get("config") or {})
continue
if action != "input":
continue
params = task.get("params") or {}
process_input(str(params.get("command", "")).strip(), redis_client, ha_task_ids, runs)
def services_list(services):
return [
{
"name": name,
"status": state["status"],
"last_check": state["last_check"],
"node": state.get("node"),
"history": state["history"],
}
for name, state in sorted(services.items())
]
def update_service_health(services, event):
service_name = event.get("service")
status = event.get("status")
node = event.get("node")
timestamp = event.get("timestamp", time.time())
if not service_name or status not in ("ok", "error"):
return
state = services.setdefault(
service_name,
{
"status": status,
"last_check": timestamp,
"node": node,
"history": [],
},
)
state["status"] = status
state["last_check"] = timestamp
state["node"] = node
state["history"].append({"status": status, "timestamp": timestamp, "node": node})
state["history"] = state["history"][-20:]
emit_event(
{
"type": "services_state",
"message": "services health updated",
"run_id": None,
"node": None,
"services": services_list(services),
}
)
def listen_for_external_events(redis_client, services):
while True:
try:
_, raw_event = redis_client.brpop("events")
event = json.loads(raw_event)
except json.JSONDecodeError:
emit_event(
{
"type": "log",
"message": f"invalid external event: {raw_event}",
"run_id": None,
"node": None,
}
)
continue
emit_event(event)
if event.get("type") == "health":
update_service_health(services, event)
2026-04-30 21:15:28 +02:00
def main():
if not os.path.exists("/tmp/agent-events.log"):
open("/tmp/agent-events.log", "w").close()
2026-04-30 21:15:28 +02:00
redis_client = get_redis_client()
ha_task_ids = {}
runs = {}
last_actions = {}
run_actions = {}
services = {}
auto_config = {
"auto_mode": True,
"action_thresholds": {
"restart_ha": 0.8,
"check_network": 0.9,
},
"default_threshold": 0.9,
"allowed_auto_actions": ["restart_ha"],
"max_retries_per_action": {
"restart_ha": 3,
},
"retry_window_seconds": 300,
"action_history": {},
}
2026-04-30 21:15:28 +02:00
listener = threading.Thread(
target=listen_for_results,
args=(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config),
2026-04-30 21:15:28 +02:00
daemon=True,
)
listener.start()
input_listener = threading.Thread(
target=listen_for_input_tasks,
args=(redis_client, ha_task_ids, runs, run_actions, auto_config),
daemon=True,
)
input_listener.start()
event_listener = threading.Thread(
target=listen_for_external_events,
args=(redis_client, services),
daemon=True,
)
event_listener.start()
2026-04-30 21:20:31 +02:00
print("[orchestrator] ready")
emit_event(
{
"type": "log",
"message": "orchestrator ready",
"run_id": None,
"node": None,
}
)
emit_auto_config(auto_config, "auto mode: on")
2026-04-30 21:15:28 +02:00
while True:
try:
command = input("> ").strip()
2026-04-30 21:20:31 +02:00
except EOFError:
if not sys.stdin.isatty():
print("[orchestrator] stdin closed, exiting")
emit_event(
{
"type": "log",
"message": "stdin closed, exiting",
"run_id": None,
"node": None,
}
)
return
2026-04-30 21:20:31 +02:00
print("[orchestrator] stdin closed, waiting...")
emit_event(
{
"type": "log",
"message": "stdin closed, waiting",
"run_id": None,
"node": None,
}
)
2026-04-30 21:20:31 +02:00
time.sleep(2)
continue
except KeyboardInterrupt:
2026-04-30 21:15:28 +02:00
print()
2026-04-30 21:20:31 +02:00
continue
2026-04-30 21:15:28 +02:00
process_input(command, redis_client, ha_task_ids, runs)
2026-04-30 21:15:28 +02:00
if __name__ == "__main__":
main()