agent-system/orchestrator/main.py

58 lines
1.4 KiB
Python
Raw Normal View History

2026-04-30 21:15:28 +02:00
import json
import threading
2026-04-30 21:20:31 +02:00
import time
2026-04-30 21:15:28 +02:00
from redis_client import get_redis_client
from result_listener import listen_for_results
from task_builder import build_task
def send_task(redis_client, task):
redis_client.lpush("tasks", json.dumps(task))
print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
def dispatch(redis_client, command, ha_task_ids):
if command == "ha":
task = build_task("piha", "docker_ps", {})
ha_task_ids.add(task["task_id"])
send_task(redis_client, task)
elif command == "all":
send_task(redis_client, build_task("piha", "docker_ps", {}))
send_task(redis_client, build_task("vps", "docker_ps", {}))
else:
send_task(redis_client, build_task("piha", "docker_ps", {}))
def main():
redis_client = get_redis_client()
ha_task_ids = set()
listener = threading.Thread(
target=listen_for_results,
args=(redis_client, ha_task_ids),
daemon=True,
)
listener.start()
2026-04-30 21:20:31 +02:00
print("[orchestrator] ready")
2026-04-30 21:15:28 +02:00
while True:
try:
command = input("> ").strip()
2026-04-30 21:20:31 +02:00
except EOFError:
print("[orchestrator] stdin closed, waiting...")
time.sleep(2)
continue
except KeyboardInterrupt:
2026-04-30 21:15:28 +02:00
print()
2026-04-30 21:20:31 +02:00
continue
2026-04-30 21:15:28 +02:00
if not command:
continue
dispatch(redis_client, command, ha_task_ids)
if __name__ == "__main__":
main()