commit 49f4ace58b1e50698f622fa4089acfcd86d3cd1c Author: Codex Date: Thu Apr 30 19:15:28 2026 +0000 Add Redis orchestrator system diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..43ae0e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.py[cod] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..0c7da1a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +services: + redis: + image: redis:7 + ports: + - "6379:6379" + restart: unless-stopped + + orchestrator: + build: ./orchestrator + depends_on: + - redis + environment: + REDIS_HOST: redis + stdin_open: true + tty: true diff --git a/orchestrator/Dockerfile b/orchestrator/Dockerfile new file mode 100644 index 0000000..7b3f32c --- /dev/null +++ b/orchestrator/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["python", "main.py"] diff --git a/orchestrator/main.py b/orchestrator/main.py new file mode 100644 index 0000000..a22ef50 --- /dev/null +++ b/orchestrator/main.py @@ -0,0 +1,52 @@ +import json +import threading + +from redis_client import get_redis_client +from result_listener import listen_for_results +from task_builder import build_task + + +def send_task(redis_client, task): + redis_client.lpush("tasks", json.dumps(task)) + print(f"sent {task['action']} to {task['target']} ({task['task_id']})") + + +def dispatch(redis_client, command, ha_task_ids): + if command == "ha": + task = build_task("piha", "docker_ps", {}) + ha_task_ids.add(task["task_id"]) + send_task(redis_client, task) + elif command == "all": + send_task(redis_client, build_task("piha", "docker_ps", {})) + send_task(redis_client, build_task("vps", "docker_ps", {})) + else: + send_task(redis_client, build_task("piha", "docker_ps", {})) + + +def main(): + redis_client = get_redis_client() + ha_task_ids = set() + + listener = threading.Thread( + target=listen_for_results, + args=(redis_client, ha_task_ids), + daemon=True, + ) + listener.start() + + print("orchestrator ready") + while True: + try: + command = input("> ").strip() + except (EOFError, KeyboardInterrupt): + print() + break + + if not command: + continue + + dispatch(redis_client, command, ha_task_ids) + + +if __name__ == "__main__": + main() diff --git a/orchestrator/redis_client.py b/orchestrator/redis_client.py new file mode 100644 index 0000000..007760b --- /dev/null +++ b/orchestrator/redis_client.py @@ -0,0 +1,8 @@ +import os + +import redis + + +def get_redis_client(): + host = os.getenv("REDIS_HOST", "redis") + return redis.Redis(host=host, port=6379, decode_responses=True) diff --git a/orchestrator/requirements.txt b/orchestrator/requirements.txt new file mode 100644 index 0000000..7800f0f --- /dev/null +++ b/orchestrator/requirements.txt @@ -0,0 +1 @@ +redis diff --git a/orchestrator/result_listener.py b/orchestrator/result_listener.py new file mode 100644 index 0000000..89de08b --- /dev/null +++ b/orchestrator/result_listener.py @@ -0,0 +1,127 @@ +import json +import re + +from task_builder import build_task + + +def send_task(redis_client, target, action, params): + task = build_task(target, action, params) + redis_client.lpush("tasks", json.dumps(task)) + print(f"sent {task['action']} to {task['target']} ({task['task_id']})") + return task + + +def detect_homeassistant(result): + if isinstance(result, list): + for item in result: + name = item.get("name", "") + if "homeassistant" in name: + return name + + if isinstance(result, str): + for token in result.split(): + if "homeassistant" in token: + return token.lstrip("/") + + return None + + +def extract_first_http_url(result): + if not isinstance(result, str): + return None + + if "ClientConnectorError" not in result or "http://" not in result: + return None + + match = re.search(r"http://[^\s'\"),]+", result) + if not match: + return None + + return match.group(0) + + +def parse_host_port(url): + address = url.split("http://", 1)[1].split("/", 1)[0] + if ":" not in address: + return None, None + + host, port = address.rsplit(":", 1) + return host, port + + +def interpret_lms_connectivity(result): + code = str(result).strip() + if code == "200": + return "service OK" + if code == "000": + return "no connection" + return "service reachable but error" + + +def listen_for_results(redis_client, ha_task_ids): + ha_log_task_ids = set() + lms_connectivity_task_ids = set() + + while True: + _, raw_result = redis_client.brpop("results") + try: + result = json.loads(raw_result) + except json.JSONDecodeError: + print(f"\n[result:error] invalid json: {raw_result}") + continue + + print("\n--- result ---") + print(f"task_id: {result.get('task_id')}") + print(f"node: {result.get('node')}") + print(f"status: {result.get('status')}") + print(f"result: {result.get('result')}") + print(f"error: {result.get('error')}") + print("--------------") + + task_id = result.get("task_id") + + if task_id in lms_connectivity_task_ids: + lms_connectivity_task_ids.remove(task_id) + diagnosis = interpret_lms_connectivity(result.get("result")) + print(f"[diagnosis] LMS connectivity: {diagnosis}") + continue + + if task_id in ha_log_task_ids: + ha_log_task_ids.remove(task_id) + if result.get("status") != "ok": + continue + + url = extract_first_http_url(result.get("result")) + if not url: + continue + + host, port = parse_host_port(url) + if not host or not port: + continue + + check_task = send_task( + redis_client, + "piha", + "exec", + {"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"}, + ) + lms_connectivity_task_ids.add(check_task["task_id"]) + continue + + if task_id in ha_task_ids: + ha_task_ids.remove(task_id) + if result.get("status") != "ok": + continue + + container_name = detect_homeassistant(result.get("result")) + if not container_name: + continue + + print(f"[orchestrator] detected HA container: {container_name}") + logs_task = send_task( + redis_client, + "piha", + "docker_logs", + {"container": container_name}, + ) + ha_log_task_ids.add(logs_task["task_id"]) diff --git a/orchestrator/task_builder.py b/orchestrator/task_builder.py new file mode 100644 index 0000000..879e552 --- /dev/null +++ b/orchestrator/task_builder.py @@ -0,0 +1,11 @@ +from uuid import uuid4 + + +def build_task(target, action, params=None): + return { + "task_id": str(uuid4()), + "target": target, + "type": "infra", + "action": action, + "params": params or {}, + }