diff --git a/.gitignore b/.gitignore index 43ae0e2..7bbff89 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ __pycache__/ *.py[cod] +.aider* diff --git a/docker-compose.monitor-agent.yml b/docker-compose.monitor-agent.yml new file mode 100644 index 0000000..c448fd4 --- /dev/null +++ b/docker-compose.monitor-agent.yml @@ -0,0 +1,9 @@ +services: + monitor-agent: + build: ./monitor-agent + environment: + NODE_NAME: ${NODE_NAME:-} + ORCHESTRATOR_URL: ${ORCHESTRATOR_URL:?set ORCHESTRATOR_URL} + SERVICES_TO_CHECK: ${SERVICES_TO_CHECK:-homeassistant,lms,forgejo} + INTERVAL_SECONDS: ${INTERVAL_SECONDS:-30} + restart: unless-stopped diff --git a/docker-compose.yml b/docker-compose.yml index 0c7da1a..882f287 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,5 +11,31 @@ services: - redis environment: REDIS_HOST: redis + HA_PROXY_URL: https://ha.okit.pl + volumes: + - /tmp/agent-events.log:/tmp/agent-events.log stdin_open: true tty: true + + webui: + build: ./webui + container_name: agent-system-webui + environment: + REDIS_HOST: redis + ports: + - "8080:8080" + volumes: + - /tmp/agent-events.log:/tmp/agent-events.log + depends_on: + - orchestrator + - redis + + monitor-agent: + build: ./monitor-agent + environment: + NODE_NAME: ubuntu-4gb-hel1-1 + ORCHESTRATOR_URL: http://webui:8080/events + SERVICES_TO_CHECK: homeassistant,lms,forgejo + depends_on: + - webui + restart: unless-stopped diff --git a/monitor-agent/Dockerfile b/monitor-agent/Dockerfile new file mode 100644 index 0000000..c44b974 --- /dev/null +++ b/monitor-agent/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY main.py . +CMD ["python", "main.py"] diff --git a/monitor-agent/main.py b/monitor-agent/main.py new file mode 100644 index 0000000..37284fb --- /dev/null +++ b/monitor-agent/main.py @@ -0,0 +1,115 @@ +import json +import os +import socket +import time +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + + +NODE_NAME = os.getenv("NODE_NAME") or socket.gethostname() +ORCHESTRATOR_URL = os.getenv("ORCHESTRATOR_URL") +SERVICES_TO_CHECK = { + name.strip() + for name in os.getenv("SERVICES_TO_CHECK", "").split(",") + if name.strip() +} +INTERVAL_SECONDS = int(os.getenv("INTERVAL_SECONDS", "30")) +SERVICE_CATALOG = [ + {"name": "homeassistant", "type": "http", "url": "http://homeassistant:8123"}, + {"name": "lms", "type": "tcp", "host": "192.168.31.6", "port": 9000}, + {"name": "forgejo", "type": "http", "url": "http://forgejo:3000"}, + {"name": "nginx", "type": "http", "url": "http://nginx"}, + {"name": "mosquitto", "type": "tcp", "host": "mosquitto", "port": 1883}, +] + + +def services_to_check(): + if not SERVICES_TO_CHECK: + return SERVICE_CATALOG + return [ + service for service in SERVICE_CATALOG + if service["name"] in SERVICES_TO_CHECK + ] + + +def check_http(url): + request = Request(url, headers={"User-Agent": "monitor-agent/1.0"}) + try: + with urlopen(request, timeout=5) as response: + return "ok" if response.status == 200 else "error" + except (HTTPError, URLError, TimeoutError, OSError): + return "error" + + +def check_tcp(host, port): + try: + with socket.create_connection((host, int(port)), timeout=5): + return "ok" + except OSError: + return "error" + + +def check_service(service): + service_type = service.get("type") + if service_type == "http": + return check_http(service["url"]) + if service_type == "tcp": + return check_tcp(service["host"], service["port"]) + return "error" + + +def build_event(service, status): + return { + "type": "health", + "service": service["name"], + "status": status, + "timestamp": time.time(), + "run_id": None, + "node": NODE_NAME, + } + + +def send_event(event): + body = json.dumps(event).encode("utf-8") + request = Request( + ORCHESTRATOR_URL, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urlopen(request, timeout=5) as response: + if response.status >= 300: + raise RuntimeError(f"event endpoint returned {response.status}") + + +def main(): + if not ORCHESTRATOR_URL: + raise SystemExit("ORCHESTRATOR_URL is required") + + selected_services = services_to_check() + print( + ( + f"[monitor-agent] ready node={NODE_NAME} " + f"url={ORCHESTRATOR_URL} " + f"services={[service['name'] for service in selected_services]}" + ), + flush=True, + ) + + while True: + started = time.time() + for service in selected_services: + status = check_service(service) + event = build_event(service, status) + try: + send_event(event) + except Exception as exc: + print(f"[monitor-agent] send failed: {exc}", flush=True) + print(json.dumps(event), flush=True) + + elapsed = time.time() - started + time.sleep(max(0, INTERVAL_SECONDS - elapsed)) + + +if __name__ == "__main__": + main() diff --git a/monitor-agent/requirements.txt b/monitor-agent/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/node-agent/Dockerfile b/node-agent/Dockerfile new file mode 100644 index 0000000..f69bbf8 --- /dev/null +++ b/node-agent/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["python", "main.py"] diff --git a/node-agent/main.py b/node-agent/main.py new file mode 100644 index 0000000..1034b41 --- /dev/null +++ b/node-agent/main.py @@ -0,0 +1,116 @@ +import json +import os +import subprocess +import time + +import redis + + +NODE_NAME = os.getenv("NODE_NAME", "node") +REDIS_HOST = os.getenv("REDIS_HOST", "redis") + + +def redis_client(): + return redis.Redis(host=REDIS_HOST, port=6379, decode_responses=True) + + +def build_result(task_id, status, result="", error=None, run_id=None): + return { + "task_id": task_id, + "run_id": run_id, + "node": NODE_NAME, + "status": status, + "result": result, + "error": error, + } + + +def run_command(cmd): + completed = subprocess.run( + cmd, + shell=True, + text=True, + capture_output=True, + timeout=30, + ) + output = completed.stdout.strip() + error = completed.stderr.strip() + + if completed.returncode != 0: + return "error", output, error or f"exit code {completed.returncode}" + + return "ok", output, None + + +def docker_ps(params): + container = params.get("container") + cmd = "docker ps --format '{{.Names}}|{{.Status}}'" + status, output, error = run_command(cmd) + if status != "ok": + return status, output, error + + containers = [] + for line in output.splitlines(): + name, _, state = line.partition("|") + if container and container not in name: + continue + containers.append({"name": name, "status": state}) + + return "ok", containers, None + + +def docker_logs(params): + container = params.get("container") + if not container: + return "error", "", "container parameter is required" + + return run_command(f"docker logs --tail 120 {container}") + + +def handle_task(task): + action = task.get("action") + params = task.get("params") or {} + + if action == "exec": + return run_command(params.get("cmd", "")) + if action == "docker_ps": + return docker_ps(params) + if action == "docker_logs": + return docker_logs(params) + + return "error", "", f"unknown action: {action}" + + +def main(): + client = redis_client() + print(f"[node-agent] ready node={NODE_NAME} redis={REDIS_HOST}") + + while True: + try: + _, raw_task = client.brpop("tasks") + task = json.loads(raw_task) + if task.get("target") != NODE_NAME: + client.lpush("tasks", raw_task) + time.sleep(1) + continue + + status, result, error = handle_task(task) + client.lpush( + "results", + json.dumps( + build_result( + task.get("task_id"), + status, + result, + error, + task.get("run_id"), + ) + ), + ) + except Exception as exc: + print(f"[node-agent] error: {exc}") + time.sleep(2) + + +if __name__ == "__main__": + main() diff --git a/node-agent/requirements.txt b/node-agent/requirements.txt new file mode 100644 index 0000000..7800f0f --- /dev/null +++ b/node-agent/requirements.txt @@ -0,0 +1 @@ +redis diff --git a/orchestrator/diagnosis.py b/orchestrator/diagnosis.py new file mode 100644 index 0000000..53091e9 --- /dev/null +++ b/orchestrator/diagnosis.py @@ -0,0 +1,80 @@ +class DiagnosisEngine: + def __init__(self): + self.signals = {} + + def add(self, key, value): + self.signals[key] = value + + def evaluate(self): + results = [] + + lms = self.signals.get("lms") + ha_logs_client_error = self.signals.get("ha_logs_client_connector_error") + ha_container = self.signals.get("ha_container") + ha_dependency_failure = ( + lms == "000" + and ha_logs_client_error + and ha_container == "running" + ) + + # LMS + if lms == "000" and not ha_dependency_failure: + results.append("[info] LMS is not reachable") + elif lms == "200": + results.append("[info] LMS: OK") + + # Dependency failures + if ha_dependency_failure: + results.append("[warning] HA dependency failure: LMS unreachable") + + # HA local + ha_local = self.signals.get("ha_local") + if ha_local == "000": + options = [ + { + "label": "Restart HA", + "command": "restart_ha", + "confidence": 0.85, + }, + { + "label": "Check network", + "command": "check_network", + "confidence": 0.6, + } + ] + ignore_option = { + "label": "Ignore", + "command": "ignore", + "is_ignore": True, + } + results.append("[error] HA is not working locally") + results.append( + { + "type": "proposal", + "message": "Home Assistant is not working", + "confidence": 0.85, + "options": sorted( + options, + key=lambda option: option["confidence"], + reverse=True, + ) + [ignore_option], + } + ) + elif ha_local == "200": + results.append("[info] HA local: OK") + + # HA proxy + ha_proxy = self.signals.get("ha_proxy") + if ha_proxy == "dns_error": + results.append("[error] HA proxy: DNS failure") + results.append("[error] external access issue: DNS or routing failure") + elif ha_proxy == "000": + results.append("[error] HA proxy: not reachable") + elif ha_proxy == "200": + results.append("[info] HA proxy: OK") + + diagnosis_count = sum(1 for result in results if isinstance(result, str)) + if diagnosis_count > 1: + results.append("[info] multiple issues detected") + + return results diff --git a/orchestrator/events.py b/orchestrator/events.py new file mode 100644 index 0000000..604ba5e --- /dev/null +++ b/orchestrator/events.py @@ -0,0 +1,35 @@ +import json +import time + +EVENT_LOG = "/tmp/agent-events.log" + + +def emit_event(event): + event.setdefault("run_id", None) + event.setdefault("node", None) + event.setdefault("timestamp", time.time()) + line = json.dumps(event) + print(line) + print("EVENT WRITTEN") + try: + with open(EVENT_LOG, "a", encoding="utf-8") as event_log: + event_log.write(line + "\n") + event_log.flush() + except OSError as exc: + print(f"[event:error] failed to write event log: {exc}") + + +def emit_run_progress(run_id, run): + emit_event( + { + "type": "run_progress", + "run_id": run_id, + "node": None, + "received": run.get("received", 0), + "expected": run.get("expected", 0), + "message": ( + f"run progress: {run.get('received', 0)}/" + f"{run.get('expected', 0)}" + ), + } + ) diff --git a/orchestrator/main.py b/orchestrator/main.py index 9bb6cac..9be20b2 100644 --- a/orchestrator/main.py +++ b/orchestrator/main.py @@ -1,22 +1,183 @@ import json +import os +import sys import threading import time +from uuid import uuid4 +from diagnosis import DiagnosisEngine +from events import emit_event, emit_run_progress from redis_client import get_redis_client from result_listener import listen_for_results from task_builder import build_task -def send_task(redis_client, task): +def send_task(redis_client, task, runs=None, run_id=None): + if run_id: + task["run_id"] = run_id + if runs is not None and run_id in runs: + runs[run_id]["expected"] += 1 + emit_run_progress(run_id, runs[run_id]) + redis_client.lpush("tasks", json.dumps(task)) print(f"sent {task['action']} to {task['target']} ({task['task_id']})") + emit_event( + { + "type": "task", + "message": f"sent {task['action']} to {task['target']}", + "run_id": task.get("run_id"), + "node": task.get("target"), + } + ) -def dispatch(redis_client, command, ha_task_ids): +def stop_run(runs, run_id): + run = runs.get(run_id) + if not run or not run.get("active"): + return False + + run["active"] = False + run["status"] = "stopped" + emit_event( + { + "type": "run_status", + "message": "run status: stopped", + "run_id": run_id, + "node": None, + "status": "stopped", + } + ) + runs.pop(run_id, None) + return True + + +def apply_run_action(redis_client, run_actions, run_id, command): + context = run_actions.get(run_id, {}) + if command == "ignore": + emit_event( + { + "type": "action", + "message": "proposal ignored", + "run_id": run_id, + "node": None, + } + ) + return True + + if command == "check_network": + emit_event( + { + "type": "action", + "message": "confirmed action: check network connectivity", + "run_id": run_id, + "node": "vps", + } + ) + send_task( + redis_client, + build_task("vps", "exec", {"cmd": "ping -c 1 1.1.1.1"}), + run_id=run_id, + ) + return True + + if command == "restart_ha": + container_name = context.get("ha_container_name") or "homeassistant" + emit_event( + { + "type": "action", + "message": f"confirmed action: restart {container_name}", + "run_id": run_id, + "node": "piha", + } + ) + send_task( + redis_client, + build_task("piha", "exec", {"cmd": f"docker restart {container_name}"}), + run_id=run_id, + ) + return True + + emit_event( + { + "type": "action", + "message": f"unknown proposal command: {command}", + "run_id": run_id, + "node": None, + } + ) + return False + + +def emit_auto_config(auto_config, message=None): + emit_event( + { + "type": "auto_config", + "message": message or "auto config updated", + "run_id": None, + "node": None, + "auto_mode": auto_config.get("auto_mode"), + "action_thresholds": auto_config.get("action_thresholds", {}), + "default_threshold": auto_config.get("default_threshold"), + "allowed_auto_actions": auto_config.get("allowed_auto_actions", []), + "max_retries_per_action": auto_config.get("max_retries_per_action", {}), + "retry_window_seconds": auto_config.get("retry_window_seconds"), + } + ) + + +def set_auto_config(auto_config, config): + if "auto_mode" in config: + auto_config["auto_mode"] = bool(config["auto_mode"]) + if isinstance(config.get("action_thresholds"), dict): + auto_config["action_thresholds"] = { + str(command): float(threshold) + for command, threshold in config["action_thresholds"].items() + } + if "default_threshold" in config: + auto_config["default_threshold"] = float(config["default_threshold"]) + if isinstance(config.get("allowed_auto_actions"), list): + auto_config["allowed_auto_actions"] = [ + str(command) + for command in config["allowed_auto_actions"] + ] + + emit_auto_config(auto_config) + + +def set_auto_mode(auto_config, enabled): + set_auto_config(auto_config, {"auto_mode": enabled}) + + +def dispatch(redis_client, command, ha_task_ids, runs): if command == "ha": + run_id = str(uuid4()) + runs[run_id] = { + "expected": 0, + "received": 0, + "engine": DiagnosisEngine(), + "active": True, + "status": "running", + } + emit_event( + { + "type": "run_status", + "message": "run status: running", + "run_id": run_id, + "node": None, + "status": "running", + } + ) + emit_run_progress(run_id, runs[run_id]) + task = build_task("piha", "docker_ps", {}) - ha_task_ids.add(task["task_id"]) - send_task(redis_client, task) + ha_task_ids[task["task_id"]] = run_id + send_task(redis_client, task, runs, run_id) + elif command == "stop": + for run_id, run in list(runs.items()): + if not run.get("active"): + continue + + stop_run(runs, run_id) elif command == "all": send_task(redis_client, build_task("piha", "docker_ps", {})) send_task(redis_client, build_task("vps", "docker_ps", {})) @@ -24,33 +185,223 @@ def dispatch(redis_client, command, ha_task_ids): send_task(redis_client, build_task("piha", "docker_ps", {})) +def process_input(command, redis_client, ha_task_ids, runs): + if not command: + return + + print(f"[input] {command}") + emit_event( + { + "type": "log", + "message": f"user input: {command}", + "run_id": None, + "node": None, + } + ) + dispatch(redis_client, command, ha_task_ids, runs) + + +def listen_for_input_tasks(redis_client, ha_task_ids, runs, run_actions, auto_config): + while True: + _, raw_task = redis_client.brpop("tasks") + try: + task = json.loads(raw_task) + except json.JSONDecodeError: + continue + + if task.get("target") != "orchestrator": + redis_client.lpush("tasks", raw_task) + time.sleep(1) + continue + + action = task.get("action") + if action == "stop_run": + params = task.get("params") or {} + stop_run(runs, str(params.get("run_id", "")).strip()) + continue + + if action == "run_action": + params = task.get("params") or {} + apply_run_action( + redis_client, + run_actions, + str(params.get("run_id", "")).strip(), + str(params.get("command", "")).strip(), + ) + continue + + if action == "set_auto_mode": + params = task.get("params") or {} + set_auto_mode(auto_config, bool(params.get("auto_mode"))) + continue + + if action == "set_auto_config": + params = task.get("params") or {} + set_auto_config(auto_config, params.get("config") or {}) + continue + + if action != "input": + continue + + params = task.get("params") or {} + process_input(str(params.get("command", "")).strip(), redis_client, ha_task_ids, runs) + + +def services_list(services): + return [ + { + "name": name, + "status": state["status"], + "last_check": state["last_check"], + "node": state.get("node"), + "history": state["history"], + } + for name, state in sorted(services.items()) + ] + + +def update_service_health(services, event): + service_name = event.get("service") + status = event.get("status") + node = event.get("node") + timestamp = event.get("timestamp", time.time()) + if not service_name or status not in ("ok", "error"): + return + + state = services.setdefault( + service_name, + { + "status": status, + "last_check": timestamp, + "node": node, + "history": [], + }, + ) + state["status"] = status + state["last_check"] = timestamp + state["node"] = node + state["history"].append({"status": status, "timestamp": timestamp, "node": node}) + state["history"] = state["history"][-20:] + + emit_event( + { + "type": "services_state", + "message": "services health updated", + "run_id": None, + "node": None, + "services": services_list(services), + } + ) + + +def listen_for_external_events(redis_client, services): + while True: + try: + _, raw_event = redis_client.brpop("events") + event = json.loads(raw_event) + except json.JSONDecodeError: + emit_event( + { + "type": "log", + "message": f"invalid external event: {raw_event}", + "run_id": None, + "node": None, + } + ) + continue + + emit_event(event) + if event.get("type") == "health": + update_service_health(services, event) + + def main(): + if not os.path.exists("/tmp/agent-events.log"): + open("/tmp/agent-events.log", "w").close() + redis_client = get_redis_client() - ha_task_ids = set() + ha_task_ids = {} + runs = {} + last_actions = {} + run_actions = {} + services = {} + auto_config = { + "auto_mode": True, + "action_thresholds": { + "restart_ha": 0.8, + "check_network": 0.9, + }, + "default_threshold": 0.9, + "allowed_auto_actions": ["restart_ha"], + "max_retries_per_action": { + "restart_ha": 3, + }, + "retry_window_seconds": 300, + "action_history": {}, + } listener = threading.Thread( target=listen_for_results, - args=(redis_client, ha_task_ids), + args=(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config), daemon=True, ) listener.start() + input_listener = threading.Thread( + target=listen_for_input_tasks, + args=(redis_client, ha_task_ids, runs, run_actions, auto_config), + daemon=True, + ) + input_listener.start() + + event_listener = threading.Thread( + target=listen_for_external_events, + args=(redis_client, services), + daemon=True, + ) + event_listener.start() + print("[orchestrator] ready") + emit_event( + { + "type": "log", + "message": "orchestrator ready", + "run_id": None, + "node": None, + } + ) + emit_auto_config(auto_config, "auto mode: on") while True: try: command = input("> ").strip() except EOFError: + if not sys.stdin.isatty(): + print("[orchestrator] stdin closed, exiting") + emit_event( + { + "type": "log", + "message": "stdin closed, exiting", + "run_id": None, + "node": None, + } + ) + return print("[orchestrator] stdin closed, waiting...") + emit_event( + { + "type": "log", + "message": "stdin closed, waiting", + "run_id": None, + "node": None, + } + ) time.sleep(2) continue except KeyboardInterrupt: print() continue - if not command: - continue - - dispatch(redis_client, command, ha_task_ids) + process_input(command, redis_client, ha_task_ids, runs) if __name__ == "__main__": diff --git a/orchestrator/result_listener.py b/orchestrator/result_listener.py index 89de08b..ed2188d 100644 --- a/orchestrator/result_listener.py +++ b/orchestrator/result_listener.py @@ -1,13 +1,32 @@ import json import re +import threading +import time +from uuid import uuid4 +from diagnosis import DiagnosisEngine +from events import emit_event, emit_run_progress from task_builder import build_task -def send_task(redis_client, target, action, params): +def send_task(redis_client, target, action, params, runs=None, run_id=None): task = build_task(target, action, params) + if run_id: + task["run_id"] = run_id + if runs is not None and run_id in runs: + runs[run_id]["expected"] += 1 + emit_run_progress(run_id, runs[run_id]) + redis_client.lpush("tasks", json.dumps(task)) print(f"sent {task['action']} to {task['target']} ({task['task_id']})") + emit_event( + { + "type": "task", + "message": f"sent {task['action']} to {task['target']}", + "run_id": task.get("run_id"), + "node": task.get("target"), + } + ) return task @@ -49,18 +68,358 @@ def parse_host_port(url): return host, port -def interpret_lms_connectivity(result): +def normalize_http_code(result): code = str(result).strip() - if code == "200": - return "service OK" - if code == "000": - return "no connection" - return "service reachable but error" + if code.isdigit(): + return code + return None -def listen_for_results(redis_client, ha_task_ids): - ha_log_task_ids = set() - lms_connectivity_task_ids = set() +def add_http_signal(engine, key, result): + code = normalize_http_code(result) + if code is not None: + engine.add(key, code) + + +def add_proxy_signal(engine, result): + error = str(result.get("error") or "") + if "exit code 6" in error: + engine.add("ha_proxy", "dns_error") + return + + add_http_signal(engine, "ha_proxy", result.get("result")) + + +def action_in_cooldown(last_actions, node, action_type, run_id=None): + key = f"{node}:{action_type}" + now = time.time() + if key in last_actions and now - last_actions[key] < 60: + print("[action] skipped (cooldown)") + emit_event( + { + "type": "action", + "message": "[action] skipped (cooldown)", + "run_id": run_id, + "node": node, + } + ) + return True + + last_actions[key] = now + return False + + +def sorted_proposal_options(result): + options = result.get("options", []) + action_options = [ + option for option in options + if not option.get("is_ignore") + ] + ignore_options = [ + option for option in options + if option.get("is_ignore") + ] + return sorted( + action_options, + key=lambda option: option.get("confidence", 0), + reverse=True, + ) + ignore_options + + +def emit_proposal_event(run_id, result): + emit_event( + { + "type": "proposal", + "run_id": run_id, + "node": None, + "message": result.get("message"), + "confidence": result.get("confidence"), + "options": sorted_proposal_options(result), + } + ) + + +def retry_limit_reached(command, run_id, auto_config): + action_history = auto_config.setdefault("action_history", {}) + history = action_history.setdefault(command, []) + now = time.time() + retry_window = auto_config.get("retry_window_seconds", 300) + cutoff = now - retry_window + history[:] = [timestamp for timestamp in history if timestamp >= cutoff] + + retry_limit = auto_config.get("max_retries_per_action", {}).get(command) + if retry_limit is None: + return False + + if len(history) >= retry_limit: + emit_event( + { + "type": "auto_action", + "run_id": run_id, + "node": None, + "message": f"[auto] blocked: {command} retry limit reached", + } + ) + return True + + history.append(now) + return False + + +def emit_learning(message, run_id=None): + emit_event( + { + "type": "learning", + "message": message, + "run_id": run_id, + "node": None, + } + ) + + +def start_ha_check_run(redis_client, runs, ha_task_ids, learning_action=None): + run_id = str(uuid4()) + runs[run_id] = { + "expected": 0, + "received": 0, + "engine": DiagnosisEngine(), + "active": True, + "status": "running", + "learning_action": learning_action, + } + emit_event( + { + "type": "run_status", + "message": "run status: running", + "run_id": run_id, + "node": None, + "status": "running", + } + ) + emit_run_progress(run_id, runs[run_id]) + + task = send_task(redis_client, "piha", "docker_ps", {}, runs, run_id) + ha_task_ids[task["task_id"]] = run_id + return run_id + + +def schedule_action_outcome_check(redis_client, runs, ha_task_ids, action): + emit_learning(f"[learning] checking outcome for {action}") + + def delayed_check(): + time.sleep(30) + start_ha_check_run( + redis_client, + runs, + ha_task_ids, + learning_action=action, + ) + + thread = threading.Thread(target=delayed_check, daemon=True) + thread.start() + + +def record_action_outcome(run, run_id, action_stats): + action = run.get("learning_action") + if not action: + return + + stats = action_stats.setdefault(action, {"success": 0, "failure": 0}) + if run["engine"].signals.get("ha_logs_client_connector_error"): + stats["failure"] += 1 + emit_learning(f"[learning] {action} failed", run_id) + else: + stats["success"] += 1 + emit_learning(f"[learning] {action} success", run_id) + + total = stats["success"] + stats["failure"] + success_rate = stats["success"] / total if total else 0 + emit_learning(f"[learning] {action} success_rate: {success_rate:.0%}", run_id) + + +def auto_execute_action( + redis_client, + engine, + option, + run_id, + last_actions, + auto_config, + runs, + ha_task_ids, +): + command = option.get("command") + confidence = option.get("confidence") + + if command == "restart_ha": + if action_in_cooldown(last_actions, "piha", "restart_ha", run_id): + return False + if retry_limit_reached(command, run_id, auto_config): + return False + + container_name = engine.signals.get("ha_container_name") or "homeassistant" + emit_event( + { + "type": "auto_action", + "run_id": run_id, + "node": "piha", + "message": f"Auto-executed: {command}", + "confidence": confidence, + } + ) + send_task( + redis_client, + "piha", + "exec", + {"cmd": f"docker restart {container_name}"}, + run_id=run_id, + ) + schedule_action_outcome_check(redis_client, runs, ha_task_ids, command) + return True + + return False + + +def maybe_auto_execute_proposal( + redis_client, + run, + run_id, + result, + last_actions, + auto_config, + runs, + ha_task_ids, +): + if run.get("learning_action"): + return + if not auto_config.get("auto_mode"): + return + + options = [ + option for option in sorted_proposal_options(result) + if not option.get("is_ignore") and isinstance(option.get("confidence"), (int, float)) + ] + if not options: + return + + best = options[0] + allowed_actions = set(auto_config.get("allowed_auto_actions", [])) + command = best.get("command") + if command not in allowed_actions: + return + + threshold = auto_config.get("action_thresholds", {}).get( + command, + auto_config.get("default_threshold", 0.9), + ) + if best.get("confidence", 0) < threshold: + return + + auto_execute_action( + redis_client, + run["engine"], + best, + run_id, + last_actions, + auto_config, + runs, + ha_task_ids, + ) + + +def emit_evaluation_event(run_id, result): + if isinstance(result, dict) and result.get("type") == "proposal": + emit_proposal_event(run_id, result) + return + + print(result) + emit_event( + { + "type": "diagnosis", + "message": result, + "run_id": run_id, + "node": None, + } + ) + + +def has_error(results): + return any( + isinstance(result, str) and result.startswith("[error]") + for result in results + ) + + +def store_action_context(run_actions, run_id, engine): + run_actions[run_id] = { + "ha_container_name": engine.signals.get("ha_container_name") or "homeassistant", + } + + +def evaluate_run_if_complete( + redis_client, + runs, + run_id, + last_actions, + run_actions, + auto_config, + ha_task_ids, + action_stats, +): + run = runs.get(run_id) + if not run or run["received"] != run["expected"]: + return + + results = run["engine"].evaluate() + for result in results: + emit_evaluation_event(run_id, result) + if isinstance(result, dict) and result.get("type") == "proposal": + maybe_auto_execute_proposal( + redis_client, + run, + run_id, + result, + last_actions, + auto_config, + runs, + ha_task_ids, + ) + status = "error" if has_error(results) else "done" + run["active"] = False + run["status"] = status + store_action_context(run_actions, run_id, run["engine"]) + emit_event( + { + "type": "run_status", + "message": f"run status: {status}", + "run_id": run_id, + "node": None, + "status": status, + } + ) + record_action_outcome(run, run_id, action_stats) + runs.pop(run_id, None) + + +def tracked_run_id_for_task(task_id, ha_task_ids, ha_log_task_ids, ha_check_tasks): + if task_id in ha_task_ids: + return ha_task_ids[task_id] + if task_id in ha_log_task_ids: + return ha_log_task_ids[task_id] + if task_id in ha_check_tasks: + return ha_check_tasks[task_id][0] + return None + + +def listen_for_results(redis_client, ha_task_ids, runs, last_actions, run_actions, auto_config): + ha_log_task_ids = {} + ha_check_tasks = {} + ha_check_results = {} + action_stats = { + "restart_ha": { + "success": 0, + "failure": 0, + } + } while True: _, raw_result = redis_client.brpop("results") @@ -68,6 +427,14 @@ def listen_for_results(redis_client, ha_task_ids): result = json.loads(raw_result) except json.JSONDecodeError: print(f"\n[result:error] invalid json: {raw_result}") + emit_event( + { + "type": "result", + "message": f"invalid json: {raw_result}", + "run_id": None, + "node": None, + } + ) continue print("\n--- result ---") @@ -79,24 +446,63 @@ def listen_for_results(redis_client, ha_task_ids): print("--------------") task_id = result.get("task_id") + run_id = result.get("run_id") or tracked_run_id_for_task( + task_id, + ha_task_ids, + ha_log_task_ids, + ha_check_tasks, + ) + emit_event( + { + "type": "result", + "message": f"result received: {task_id} status={result.get('status')}", + "run_id": run_id, + "node": result.get("node"), + } + ) + run = runs.get(run_id) if run_id else None + if not run: + continue - if task_id in lms_connectivity_task_ids: - lms_connectivity_task_ids.remove(task_id) - diagnosis = interpret_lms_connectivity(result.get("result")) - print(f"[diagnosis] LMS connectivity: {diagnosis}") + run["received"] += 1 + emit_run_progress(run_id, run) + + if task_id in ha_check_tasks: + tracked_run_id, label = ha_check_tasks.pop(task_id) + if tracked_run_id != run_id: + continue + checks = ha_check_results.setdefault(run_id, {}) + checks[label] = normalize_http_code(result.get("result")) + if label == "ha_local_check": + add_http_signal(run["engine"], "ha_local", result.get("result")) + add_http_signal(run["engine"], "lms", result.get("result")) + elif label == "ha_proxy_check": + add_proxy_signal(run["engine"], result) + + if "ha_local_check" in checks and "ha_proxy_check" in checks: + ha_check_results.pop(run_id, None) + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) continue if task_id in ha_log_task_ids: - ha_log_task_ids.remove(task_id) - if result.get("status") != "ok": + tracked_run_id = ha_log_task_ids.pop(task_id) + if tracked_run_id != run_id: continue + if result.get("status") != "ok": + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) + continue + + if "ClientConnectorError" in str(result.get("result")): + run["engine"].add("ha_logs_client_connector_error", True) url = extract_first_http_url(result.get("result")) if not url: + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) continue host, port = parse_host_port(url) if not host or not port: + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) continue check_task = send_task( @@ -104,24 +510,58 @@ def listen_for_results(redis_client, ha_task_ids): "piha", "exec", {"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"}, + runs, + run_id, ) - lms_connectivity_task_ids.add(check_task["task_id"]) + ha_check_tasks[check_task["task_id"]] = (run_id, "ha_local_check") + + proxy_task = send_task( + redis_client, + "vps", + "exec", + {"cmd": f"curl -s -o /dev/null -w '%{{http_code}}' {url}"}, + runs, + run_id, + ) + ha_check_tasks[proxy_task["task_id"]] = (run_id, "ha_proxy_check") + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) continue if task_id in ha_task_ids: - ha_task_ids.remove(task_id) + tracked_run_id = ha_task_ids.pop(task_id) + if tracked_run_id != run_id: + continue if result.get("status") != "ok": + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) continue container_name = detect_homeassistant(result.get("result")) if not container_name: + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) continue print(f"[orchestrator] detected HA container: {container_name}") + emit_event( + { + "type": "log", + "message": f"detected HA container: {container_name}", + "run_id": run_id, + "node": result.get("node"), + } + ) + run["engine"].add("ha_container", "running") + run["engine"].add("ha_container_name", container_name) + logs_task = send_task( redis_client, "piha", "docker_logs", {"container": container_name}, + runs, + run_id, ) - ha_log_task_ids.add(logs_task["task_id"]) + ha_log_task_ids[logs_task["task_id"]] = run_id + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) + continue + + evaluate_run_if_complete(redis_client, runs, run_id, last_actions, run_actions, auto_config, ha_task_ids, action_stats) diff --git a/webui/Dockerfile b/webui/Dockerfile new file mode 100644 index 0000000..771fcda --- /dev/null +++ b/webui/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.11-slim + +WORKDIR /app +COPY web.py index.html ./ + +EXPOSE 8080 +CMD ["python", "web.py"] diff --git a/webui/index.html b/webui/index.html new file mode 100644 index 0000000..150d978 --- /dev/null +++ b/webui/index.html @@ -0,0 +1,560 @@ + + + + + + Agent System + + + +
+

Agent System

+
+
Services
+
+
+
+
+ + + +
+
+
Auto Mode Config
+
+ + + + + + + + + + + +
+
+
+ + + + diff --git a/webui/web.py b/webui/web.py new file mode 100644 index 0000000..91ac710 --- /dev/null +++ b/webui/web.py @@ -0,0 +1,248 @@ +import json +import os +import socket +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path + + +EVENT_LOG = Path("/tmp/agent-events.log") +STATIC_DIR = Path(__file__).parent +REDIS_HOST = os.getenv("REDIS_HOST", "redis") +REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) +DEFAULT_CONFIG = { + "auto_mode": True, + "action_thresholds": { + "restart_ha": 0.8, + "check_network": 0.9, + }, + "default_threshold": 0.9, + "allowed_auto_actions": ["restart_ha"], +} + + +def tail_lines(path, limit): + if not path.exists(): + path.touch() + + with path.open("r", encoding="utf-8", errors="replace") as handle: + lines = handle.readlines() + print(f"Read {len(lines)} lines", flush=True) + return [line.rstrip("\n") for line in lines[-limit:]] + + +def redis_command(*parts): + payload = f"*{len(parts)}\r\n".encode("utf-8") + for part in parts: + data = str(part).encode("utf-8") + payload += f"${len(data)}\r\n".encode("utf-8") + data + b"\r\n" + + with socket.create_connection((REDIS_HOST, REDIS_PORT), timeout=3) as client: + client.sendall(payload) + return client.recv(4096) + + +def send_command(command): + task = { + "target": "orchestrator", + "action": "input", + "params": { + "command": command, + }, + } + redis_command("LPUSH", "tasks", json.dumps(task)) + + +def send_stop_run(run_id): + task = { + "target": "orchestrator", + "action": "stop_run", + "params": { + "run_id": run_id, + }, + } + redis_command("LPUSH", "tasks", json.dumps(task)) + + +def send_run_action(run_id, command): + task = { + "target": "orchestrator", + "action": "run_action", + "params": { + "run_id": run_id, + "command": command, + }, + } + redis_command("LPUSH", "tasks", json.dumps(task)) + + +def send_auto_mode(auto_mode): + task = { + "target": "orchestrator", + "action": "set_auto_mode", + "params": { + "auto_mode": auto_mode, + }, + } + redis_command("LPUSH", "tasks", json.dumps(task)) + + +def send_auto_config(config): + task = { + "target": "orchestrator", + "action": "set_auto_config", + "params": { + "config": config, + }, + } + redis_command("LPUSH", "tasks", json.dumps(task)) + + +def send_event(event): + redis_command("LPUSH", "events", json.dumps(event)) + + +def current_config(): + config = json.loads(json.dumps(DEFAULT_CONFIG)) + for line in tail_lines(EVENT_LOG, 500): + try: + event = json.loads(line) + except json.JSONDecodeError: + continue + if event.get("type") != "auto_config": + continue + + for key in config: + if key in event: + config[key] = event[key] + return config + + +def current_services(): + services = [] + for line in tail_lines(EVENT_LOG, 1000): + try: + event = json.loads(line) + except json.JSONDecodeError: + continue + if event.get("type") == "services_state": + services = event.get("services", []) + return services + + +def send_json(status, payload, handler): + body = (json.dumps(payload) + "\n").encode("utf-8") + handler.send_response(status) + handler.send_header("Content-Type", "application/json") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +class Handler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/config": + send_json(200, current_config(), self) + return + + if self.path == "/services": + send_json(200, current_services(), self) + return + + if self.path == "/logs": + print("LOGS endpoint called", flush=True) + body = ("\n".join(tail_lines(EVENT_LOG, 200)) + "\n").encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + if self.path in ("/", "/index.html"): + body = (STATIC_DIR / "index.html").read_bytes() + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + self.send_error(404) + + def do_POST(self): + if self.path not in ( + "/command", + "/stop", + "/action", + "/auto-mode", + "/config", + "/events", + ): + self.send_error(404) + return + + length = int(self.headers.get("Content-Length", "0")) + raw_body = self.rfile.read(length).decode("utf-8") + try: + payload = json.loads(raw_body) + command = str(payload.get("command", "")).strip() + except json.JSONDecodeError: + payload = {} + command = raw_body.strip() + + if self.path == "/events": + if not isinstance(payload, dict): + self.send_error(400, "event object is required") + return + send_event(payload) + send_json(200, {"status": "sent"}, self) + return + + if self.path == "/stop": + run_id = str(payload.get("run_id", "")).strip() + if not run_id: + self.send_error(400, "run_id is required") + return + + send_stop_run(run_id) + send_json(200, {"status": "sent"}, self) + return + + if self.path == "/action": + run_id = str(payload.get("run_id", "")).strip() + action_command = str(payload.get("command", "")).strip() + if not run_id: + self.send_error(400, "run_id is required") + return + if not action_command: + self.send_error(400, "command is required") + return + + send_run_action(run_id, action_command) + send_json(200, {"status": "sent"}, self) + return + + if self.path == "/auto-mode": + send_auto_mode(bool(payload.get("auto_mode"))) + send_json(200, {"status": "sent"}, self) + return + + if self.path == "/config": + send_auto_config(payload) + send_json(200, {"status": "sent"}, self) + return + + if not command: + self.send_error(400, "command is required") + return + + send_command(command) + send_json(200, {"status": "sent"}, self) + + def log_message(self, format, *args): + return + + +if __name__ == "__main__": + server = ThreadingHTTPServer(("0.0.0.0", 8080), Handler) + server.serve_forever()