Add Redis orchestrator system
This commit is contained in:
commit
49f4ace58b
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
__pycache__/
|
||||
*.py[cod]
|
||||
15
docker-compose.yml
Normal file
15
docker-compose.yml
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
services:
|
||||
redis:
|
||||
image: redis:7
|
||||
ports:
|
||||
- "6379:6379"
|
||||
restart: unless-stopped
|
||||
|
||||
orchestrator:
|
||||
build: ./orchestrator
|
||||
depends_on:
|
||||
- redis
|
||||
environment:
|
||||
REDIS_HOST: redis
|
||||
stdin_open: true
|
||||
tty: true
|
||||
10
orchestrator/Dockerfile
Normal file
10
orchestrator/Dockerfile
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
FROM python:3.11-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
CMD ["python", "main.py"]
|
||||
52
orchestrator/main.py
Normal file
52
orchestrator/main.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
import json
|
||||
import threading
|
||||
|
||||
from redis_client import get_redis_client
|
||||
from result_listener import listen_for_results
|
||||
from task_builder import build_task
|
||||
|
||||
|
||||
def send_task(redis_client, task):
|
||||
redis_client.lpush("tasks", json.dumps(task))
|
||||
print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
|
||||
|
||||
|
||||
def dispatch(redis_client, command, ha_task_ids):
|
||||
if command == "ha":
|
||||
task = build_task("piha", "docker_ps", {})
|
||||
ha_task_ids.add(task["task_id"])
|
||||
send_task(redis_client, task)
|
||||
elif command == "all":
|
||||
send_task(redis_client, build_task("piha", "docker_ps", {}))
|
||||
send_task(redis_client, build_task("vps", "docker_ps", {}))
|
||||
else:
|
||||
send_task(redis_client, build_task("piha", "docker_ps", {}))
|
||||
|
||||
|
||||
def main():
|
||||
redis_client = get_redis_client()
|
||||
ha_task_ids = set()
|
||||
|
||||
listener = threading.Thread(
|
||||
target=listen_for_results,
|
||||
args=(redis_client, ha_task_ids),
|
||||
daemon=True,
|
||||
)
|
||||
listener.start()
|
||||
|
||||
print("orchestrator ready")
|
||||
while True:
|
||||
try:
|
||||
command = input("> ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print()
|
||||
break
|
||||
|
||||
if not command:
|
||||
continue
|
||||
|
||||
dispatch(redis_client, command, ha_task_ids)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
8
orchestrator/redis_client.py
Normal file
8
orchestrator/redis_client.py
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
import os
|
||||
|
||||
import redis
|
||||
|
||||
|
||||
def get_redis_client():
|
||||
host = os.getenv("REDIS_HOST", "redis")
|
||||
return redis.Redis(host=host, port=6379, decode_responses=True)
|
||||
1
orchestrator/requirements.txt
Normal file
1
orchestrator/requirements.txt
Normal file
|
|
@ -0,0 +1 @@
|
|||
redis
|
||||
127
orchestrator/result_listener.py
Normal file
127
orchestrator/result_listener.py
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
import json
|
||||
import re
|
||||
|
||||
from task_builder import build_task
|
||||
|
||||
|
||||
def send_task(redis_client, target, action, params):
|
||||
task = build_task(target, action, params)
|
||||
redis_client.lpush("tasks", json.dumps(task))
|
||||
print(f"sent {task['action']} to {task['target']} ({task['task_id']})")
|
||||
return task
|
||||
|
||||
|
||||
def detect_homeassistant(result):
|
||||
if isinstance(result, list):
|
||||
for item in result:
|
||||
name = item.get("name", "")
|
||||
if "homeassistant" in name:
|
||||
return name
|
||||
|
||||
if isinstance(result, str):
|
||||
for token in result.split():
|
||||
if "homeassistant" in token:
|
||||
return token.lstrip("/")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def extract_first_http_url(result):
|
||||
if not isinstance(result, str):
|
||||
return None
|
||||
|
||||
if "ClientConnectorError" not in result or "http://" not in result:
|
||||
return None
|
||||
|
||||
match = re.search(r"http://[^\s'\"),]+", result)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
return match.group(0)
|
||||
|
||||
|
||||
def parse_host_port(url):
|
||||
address = url.split("http://", 1)[1].split("/", 1)[0]
|
||||
if ":" not in address:
|
||||
return None, None
|
||||
|
||||
host, port = address.rsplit(":", 1)
|
||||
return host, port
|
||||
|
||||
|
||||
def interpret_lms_connectivity(result):
|
||||
code = str(result).strip()
|
||||
if code == "200":
|
||||
return "service OK"
|
||||
if code == "000":
|
||||
return "no connection"
|
||||
return "service reachable but error"
|
||||
|
||||
|
||||
def listen_for_results(redis_client, ha_task_ids):
|
||||
ha_log_task_ids = set()
|
||||
lms_connectivity_task_ids = set()
|
||||
|
||||
while True:
|
||||
_, raw_result = redis_client.brpop("results")
|
||||
try:
|
||||
result = json.loads(raw_result)
|
||||
except json.JSONDecodeError:
|
||||
print(f"\n[result:error] invalid json: {raw_result}")
|
||||
continue
|
||||
|
||||
print("\n--- result ---")
|
||||
print(f"task_id: {result.get('task_id')}")
|
||||
print(f"node: {result.get('node')}")
|
||||
print(f"status: {result.get('status')}")
|
||||
print(f"result: {result.get('result')}")
|
||||
print(f"error: {result.get('error')}")
|
||||
print("--------------")
|
||||
|
||||
task_id = result.get("task_id")
|
||||
|
||||
if task_id in lms_connectivity_task_ids:
|
||||
lms_connectivity_task_ids.remove(task_id)
|
||||
diagnosis = interpret_lms_connectivity(result.get("result"))
|
||||
print(f"[diagnosis] LMS connectivity: {diagnosis}")
|
||||
continue
|
||||
|
||||
if task_id in ha_log_task_ids:
|
||||
ha_log_task_ids.remove(task_id)
|
||||
if result.get("status") != "ok":
|
||||
continue
|
||||
|
||||
url = extract_first_http_url(result.get("result"))
|
||||
if not url:
|
||||
continue
|
||||
|
||||
host, port = parse_host_port(url)
|
||||
if not host or not port:
|
||||
continue
|
||||
|
||||
check_task = send_task(
|
||||
redis_client,
|
||||
"piha",
|
||||
"exec",
|
||||
{"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"},
|
||||
)
|
||||
lms_connectivity_task_ids.add(check_task["task_id"])
|
||||
continue
|
||||
|
||||
if task_id in ha_task_ids:
|
||||
ha_task_ids.remove(task_id)
|
||||
if result.get("status") != "ok":
|
||||
continue
|
||||
|
||||
container_name = detect_homeassistant(result.get("result"))
|
||||
if not container_name:
|
||||
continue
|
||||
|
||||
print(f"[orchestrator] detected HA container: {container_name}")
|
||||
logs_task = send_task(
|
||||
redis_client,
|
||||
"piha",
|
||||
"docker_logs",
|
||||
{"container": container_name},
|
||||
)
|
||||
ha_log_task_ids.add(logs_task["task_id"])
|
||||
11
orchestrator/task_builder.py
Normal file
11
orchestrator/task_builder.py
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
from uuid import uuid4
|
||||
|
||||
|
||||
def build_task(target, action, params=None):
|
||||
return {
|
||||
"task_id": str(uuid4()),
|
||||
"target": target,
|
||||
"type": "infra",
|
||||
"action": action,
|
||||
"params": params or {},
|
||||
}
|
||||
Loading…
Reference in a new issue