agent-system/node-agent/main.py

117 lines
2.8 KiB
Python
Raw Permalink Normal View History

import json
import os
import subprocess
import time
import redis
NODE_NAME = os.getenv("NODE_NAME", "node")
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
def redis_client():
return redis.Redis(host=REDIS_HOST, port=6379, decode_responses=True)
def build_result(task_id, status, result="", error=None, run_id=None):
return {
"task_id": task_id,
"run_id": run_id,
"node": NODE_NAME,
"status": status,
"result": result,
"error": error,
}
def run_command(cmd):
completed = subprocess.run(
cmd,
shell=True,
text=True,
capture_output=True,
timeout=30,
)
output = completed.stdout.strip()
error = completed.stderr.strip()
if completed.returncode != 0:
return "error", output, error or f"exit code {completed.returncode}"
return "ok", output, None
def docker_ps(params):
container = params.get("container")
cmd = "docker ps --format '{{.Names}}|{{.Status}}'"
status, output, error = run_command(cmd)
if status != "ok":
return status, output, error
containers = []
for line in output.splitlines():
name, _, state = line.partition("|")
if container and container not in name:
continue
containers.append({"name": name, "status": state})
return "ok", containers, None
def docker_logs(params):
container = params.get("container")
if not container:
return "error", "", "container parameter is required"
return run_command(f"docker logs --tail 120 {container}")
def handle_task(task):
action = task.get("action")
params = task.get("params") or {}
if action == "exec":
return run_command(params.get("cmd", ""))
if action == "docker_ps":
return docker_ps(params)
if action == "docker_logs":
return docker_logs(params)
return "error", "", f"unknown action: {action}"
def main():
client = redis_client()
print(f"[node-agent] ready node={NODE_NAME} redis={REDIS_HOST}")
while True:
try:
_, raw_task = client.brpop("tasks")
task = json.loads(raw_task)
if task.get("target") != NODE_NAME:
client.lpush("tasks", raw_task)
time.sleep(1)
continue
status, result, error = handle_task(task)
client.lpush(
"results",
json.dumps(
build_result(
task.get("task_id"),
status,
result,
error,
task.get("run_id"),
)
),
)
except Exception as exc:
print(f"[node-agent] error: {exc}")
time.sleep(2)
if __name__ == "__main__":
main()