409 lines
11 KiB
Python
409 lines
11 KiB
Python
import json
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
from uuid import uuid4
|
|
|
|
from diagnosis import DiagnosisEngine
|
|
from events import emit_event, emit_run_progress
|
|
from redis_client import get_redis_client
|
|
from result_listener import listen_for_results
|
|
from task_builder import build_task
|
|
|
|
|
|
def send_task(redis_client, task, runs=None, run_id=None):
|
|
if run_id:
|
|
task["run_id"] = run_id
|
|
if runs is not None and run_id in runs:
|
|
runs[run_id]["expected"] += 1
|
|
emit_run_progress(run_id, runs[run_id])
|
|
|
|
redis_client.lpush("tasks", json.dumps(task))
|
|
print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
|
|
emit_event(
|
|
{
|
|
"type": "task",
|
|
"message": f"sent {task['action']} to {task['target']}",
|
|
"run_id": task.get("run_id"),
|
|
"node": task.get("target"),
|
|
}
|
|
)
|
|
|
|
|
|
def stop_run(runs, run_id):
|
|
run = runs.get(run_id)
|
|
if not run or not run.get("active"):
|
|
return False
|
|
|
|
run["active"] = False
|
|
run["status"] = "stopped"
|
|
emit_event(
|
|
{
|
|
"type": "run_status",
|
|
"message": "run status: stopped",
|
|
"run_id": run_id,
|
|
"node": None,
|
|
"status": "stopped",
|
|
}
|
|
)
|
|
runs.pop(run_id, None)
|
|
return True
|
|
|
|
|
|
def apply_run_action(redis_client, run_actions, run_id, command):
|
|
context = run_actions.get(run_id, {})
|
|
if command == "ignore":
|
|
emit_event(
|
|
{
|
|
"type": "action",
|
|
"message": "proposal ignored",
|
|
"run_id": run_id,
|
|
"node": None,
|
|
}
|
|
)
|
|
return True
|
|
|
|
if command == "check_network":
|
|
emit_event(
|
|
{
|
|
"type": "action",
|
|
"message": "confirmed action: check network connectivity",
|
|
"run_id": run_id,
|
|
"node": "vps",
|
|
}
|
|
)
|
|
send_task(
|
|
redis_client,
|
|
build_task("vps", "exec", {"cmd": "ping -c 1 1.1.1.1"}),
|
|
run_id=run_id,
|
|
)
|
|
return True
|
|
|
|
if command == "restart_ha":
|
|
container_name = context.get("ha_container_name") or "homeassistant"
|
|
emit_event(
|
|
{
|
|
"type": "action",
|
|
"message": f"confirmed action: restart {container_name}",
|
|
"run_id": run_id,
|
|
"node": "piha",
|
|
}
|
|
)
|
|
send_task(
|
|
redis_client,
|
|
build_task("piha", "exec", {"cmd": f"docker restart {container_name}"}),
|
|
run_id=run_id,
|
|
)
|
|
return True
|
|
|
|
emit_event(
|
|
{
|
|
"type": "action",
|
|
"message": f"unknown proposal command: {command}",
|
|
"run_id": run_id,
|
|
"node": None,
|
|
}
|
|
)
|
|
return False
|
|
|
|
|
|
def emit_auto_config(auto_config, message=None):
|
|
emit_event(
|
|
{
|
|
"type": "auto_config",
|
|
"message": message or "auto config updated",
|
|
"run_id": None,
|
|
"node": None,
|
|
"auto_mode": auto_config.get("auto_mode"),
|
|
"action_thresholds": auto_config.get("action_thresholds", {}),
|
|
"default_threshold": auto_config.get("default_threshold"),
|
|
"allowed_auto_actions": auto_config.get("allowed_auto_actions", []),
|
|
"max_retries_per_action": auto_config.get("max_retries_per_action", {}),
|
|
"retry_window_seconds": auto_config.get("retry_window_seconds"),
|
|
}
|
|
)
|
|
|
|
|
|
def set_auto_config(auto_config, config):
|
|
if "auto_mode" in config:
|
|
auto_config["auto_mode"] = bool(config["auto_mode"])
|
|
if isinstance(config.get("action_thresholds"), dict):
|
|
auto_config["action_thresholds"] = {
|
|
str(command): float(threshold)
|
|
for command, threshold in config["action_thresholds"].items()
|
|
}
|
|
if "default_threshold" in config:
|
|
auto_config["default_threshold"] = float(config["default_threshold"])
|
|
if isinstance(config.get("allowed_auto_actions"), list):
|
|
auto_config["allowed_auto_actions"] = [
|
|
str(command)
|
|
for command in config["allowed_auto_actions"]
|
|
]
|
|
|
|
emit_auto_config(auto_config)
|
|
|
|
|
|
def set_auto_mode(auto_config, enabled):
|
|
set_auto_config(auto_config, {"auto_mode": enabled})
|
|
|
|
|
|
def dispatch(redis_client, command, ha_task_ids, runs):
|
|
if command == "ha":
|
|
run_id = str(uuid4())
|
|
runs[run_id] = {
|
|
"expected": 0,
|
|
"received": 0,
|
|
"engine": DiagnosisEngine(),
|
|
"active": True,
|
|
"status": "running",
|
|
}
|
|
emit_event(
|
|
{
|
|
"type": "run_status",
|
|
"message": "run status: running",
|
|
"run_id": run_id,
|
|
"node": None,
|
|
"status": "running",
|
|
}
|
|
)
|
|
emit_run_progress(run_id, runs[run_id])
|
|
|
|
task = build_task("piha", "docker_ps", {})
|
|
ha_task_ids[task["task_id"]] = run_id
|
|
send_task(redis_client, task, runs, run_id)
|
|
elif command == "stop":
|
|
for run_id, run in list(runs.items()):
|
|
if not run.get("active"):
|
|
continue
|
|
|
|
stop_run(runs, run_id)
|
|
elif command == "all":
|
|
send_task(redis_client, build_task("piha", "docker_ps", {}))
|
|
send_task(redis_client, build_task("vps", "docker_ps", {}))
|
|
else:
|
|
send_task(redis_client, build_task("piha", "docker_ps", {}))
|
|
|
|
|
|
def process_input(command, redis_client, ha_task_ids, runs):
|
|
if not command:
|
|
return
|
|
|
|
print(f"[input] {command}")
|
|
emit_event(
|
|
{
|
|
"type": "log",
|
|
"message": f"user input: {command}",
|
|
"run_id": None,
|
|
"node": None,
|
|
}
|
|
)
|
|
dispatch(redis_client, command, ha_task_ids, runs)
|
|
|
|
|
|
def listen_for_input_tasks(redis_client, ha_task_ids, runs, run_actions, auto_config):
|
|
while True:
|
|
_, raw_task = redis_client.brpop("tasks")
|
|
try:
|
|
task = json.loads(raw_task)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if task.get("target") != "orchestrator":
|
|
redis_client.lpush("tasks", raw_task)
|
|
time.sleep(1)
|
|
continue
|
|
|
|
action = task.get("action")
|
|
if action == "stop_run":
|
|
params = task.get("params") or {}
|
|
stop_run(runs, str(params.get("run_id", "")).strip())
|
|
continue
|
|
|
|
if action == "run_action":
|
|
params = task.get("params") or {}
|
|
apply_run_action(
|
|
redis_client,
|
|
run_actions,
|
|
str(params.get("run_id", "")).strip(),
|
|
str(params.get("command", "")).strip(),
|
|
)
|
|
continue
|
|
|
|
if action == "set_auto_mode":
|
|
params = task.get("params") or {}
|
|
set_auto_mode(auto_config, bool(params.get("auto_mode")))
|
|
continue
|
|
|
|
if action == "set_auto_config":
|
|
params = task.get("params") or {}
|
|
set_auto_config(auto_config, params.get("config") or {})
|
|
continue
|
|
|
|
if action != "input":
|
|
continue
|
|
|
|
params = task.get("params") or {}
|
|
process_input(str(params.get("command", "")).strip(), redis_client, ha_task_ids, runs)
|
|
|
|
|
|
def services_list(services):
|
|
return [
|
|
{
|
|
"name": name,
|
|
"status": state["status"],
|
|
"last_check": state["last_check"],
|
|
"node": state.get("node"),
|
|
"history": state["history"],
|
|
}
|
|
for name, state in sorted(services.items())
|
|
]
|
|
|
|
|
|
def update_service_health(services, event):
|
|
service_name = event.get("service")
|
|
status = event.get("status")
|
|
node = event.get("node")
|
|
timestamp = event.get("timestamp", time.time())
|
|
if not service_name or status not in ("ok", "error"):
|
|
return
|
|
|
|
state = services.setdefault(
|
|
service_name,
|
|
{
|
|
"status": status,
|
|
"last_check": timestamp,
|
|
"node": node,
|
|
"history": [],
|
|
},
|
|
)
|
|
state["status"] = status
|
|
state["last_check"] = timestamp
|
|
state["node"] = node
|
|
state["history"].append({"status": status, "timestamp": timestamp, "node": node})
|
|
state["history"] = state["history"][-20:]
|
|
|
|
emit_event(
|
|
{
|
|
"type": "services_state",
|
|
"message": "services health updated",
|
|
"run_id": None,
|
|
"node": None,
|
|
"services": services_list(services),
|
|
}
|
|
)
|
|
|
|
|
|
def listen_for_external_events(redis_client, services):
|
|
while True:
|
|
try:
|
|
_, raw_event = redis_client.brpop("events")
|
|
event = json.loads(raw_event)
|
|
except json.JSONDecodeError:
|
|
emit_event(
|
|
{
|
|
"type": "log",
|
|
"message": f"invalid external event: {raw_event}",
|
|
"run_id": None,
|
|
"node": None,
|
|
}
|
|
)
|
|
continue
|
|
|
|
emit_event(event)
|
|
if event.get("type") == "health":
|
|
update_service_health(services, event)
|
|
|
|
|
|
def main():
|
|
if not os.path.exists("/tmp/agent-events.log"):
|
|
open("/tmp/agent-events.log", "w").close()
|
|
|
|
redis_client = get_redis_client()
|
|
ha_task_ids = {}
|
|
runs = {}
|
|
last_actions = {}
|
|
run_actions = {}
|
|
services = {}
|
|
auto_config = {
|
|
"auto_mode": True,
|
|
"action_thresholds": {
|
|
"restart_ha": 0.8,
|
|
"check_network": 0.9,
|
|
},
|
|
"default_threshold": 0.9,
|
|
"allowed_auto_actions": ["restart_ha"],
|
|
"max_retries_per_action": {
|
|
"restart_ha": 3,
|
|
},
|
|
"retry_window_seconds": 300,
|
|
"action_history": {},
|
|
}
|
|
|
|
listener = threading.Thread(
|
|
target=listen_for_results,
|
|
args=(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config),
|
|
daemon=True,
|
|
)
|
|
listener.start()
|
|
|
|
input_listener = threading.Thread(
|
|
target=listen_for_input_tasks,
|
|
args=(redis_client, ha_task_ids, runs, run_actions, auto_config),
|
|
daemon=True,
|
|
)
|
|
input_listener.start()
|
|
|
|
event_listener = threading.Thread(
|
|
target=listen_for_external_events,
|
|
args=(redis_client, services),
|
|
daemon=True,
|
|
)
|
|
event_listener.start()
|
|
|
|
print("[orchestrator] ready")
|
|
emit_event(
|
|
{
|
|
"type": "log",
|
|
"message": "orchestrator ready",
|
|
"run_id": None,
|
|
"node": None,
|
|
}
|
|
)
|
|
emit_auto_config(auto_config, "auto mode: on")
|
|
while True:
|
|
try:
|
|
command = input("> ").strip()
|
|
except EOFError:
|
|
if not sys.stdin.isatty():
|
|
print("[orchestrator] stdin closed, exiting")
|
|
emit_event(
|
|
{
|
|
"type": "log",
|
|
"message": "stdin closed, exiting",
|
|
"run_id": None,
|
|
"node": None,
|
|
}
|
|
)
|
|
return
|
|
print("[orchestrator] stdin closed, waiting...")
|
|
emit_event(
|
|
{
|
|
"type": "log",
|
|
"message": "stdin closed, waiting",
|
|
"run_id": None,
|
|
"node": None,
|
|
}
|
|
)
|
|
time.sleep(2)
|
|
continue
|
|
except KeyboardInterrupt:
|
|
print()
|
|
continue
|
|
|
|
process_input(command, redis_client, ha_task_ids, runs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|