From 5754994f8e3fa0198b64549ed58c7a6311a34cf8 Mon Sep 17 00:00:00 2001 From: oskar Date: Sun, 17 May 2026 23:42:52 +0200 Subject: [PATCH] Refactor Telegram bot to use control plane API --- services/agent-system/README.md | 18 ++ services/agent-system/deploy.sh | 4 +- services/agent-system/docker-compose.yml | 5 +- services/agent-system/env.example | 5 + services/agent-system/telegram-bot/bot.py | 258 ++++++++++++++++++++-- 5 files changed, 271 insertions(+), 19 deletions(-) diff --git a/services/agent-system/README.md b/services/agent-system/README.md index c811dd6..d433c86 100644 --- a/services/agent-system/README.md +++ b/services/agent-system/README.md @@ -5,6 +5,24 @@ Central runtime materializer and Operator Control Plane UI. - **Redis**: Central state store (on PIHA). - **Runtime Materializer**: Converts Redis state to JSON files in `/opt/homelab/world`. - **Web UI**: Exposes API endpoints and serving the Operator UI. +- **Telegram Bot**: Provides operator commands and action approvals via Telegram. + +#### Configuration +Environment variables should be set in `.env` (see `env.example`). +Key variables for the Telegram Bot: +- `TELEGRAM_BOT_TOKEN`: Your bot token from @BotFather. +- `TELEGRAM_ALLOWED_USER_IDS`: Comma-separated list of authorized Telegram User IDs. +- `CONTROL_PLANE_URL`: URL to the `agent-system-webui` (default: `http://webui:8080`). + +#### Telegram Commands +- `/status`: Check bot and API connectivity. +- `/summary`: System health overview. +- `/nodes`: List homelab nodes and their status. +- `/services`: Summary of services across nodes. +- `/unhealthy`: List all unhealthy components. +- `/incidents`: View active incidents. +- `/actions`: Summary of operator actions. +- `/help`: List all commands. #### Deployment (on PIHA) ```bash diff --git a/services/agent-system/deploy.sh b/services/agent-system/deploy.sh index 7175f37..8c80d94 100755 --- a/services/agent-system/deploy.sh +++ b/services/agent-system/deploy.sh @@ -11,7 +11,9 @@ echo ">>> Services status:" docker ps --filter "name=agent-system" --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}" if [ -z "$TELEGRAM_BOT_TOKEN" ]; then - echo "NOTE: TELEGRAM_BOT_TOKEN is not set. Telegram approval bot will be disabled." + echo ">>> Telegram bot status: DISABLED (token missing)" +else + echo ">>> Telegram bot status: ENABLED" fi echo ">>> Verifying API endpoints..." diff --git a/services/agent-system/docker-compose.yml b/services/agent-system/docker-compose.yml index 7df87db..f8ef1c0 100644 --- a/services/agent-system/docker-compose.yml +++ b/services/agent-system/docker-compose.yml @@ -38,7 +38,10 @@ services: environment: TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN} TELEGRAM_ALLOWED_USER_IDS: ${TELEGRAM_ALLOWED_USER_IDS} + CONTROL_PLANE_URL: ${CONTROL_PLANE_URL:-http://webui:8080} + ENABLE_LLM_FALLBACK: ${ENABLE_LLM_FALLBACK:-false} + OPENCLAW_BASE_URL: ${OPENCLAW_BASE_URL} ACTIONS_ROOT: /opt/homelab/actions volumes: - /opt/homelab:/opt/homelab - restart: unless-stopped + restart: on-failure diff --git a/services/agent-system/env.example b/services/agent-system/env.example index 0a066ce..b142cd8 100644 --- a/services/agent-system/env.example +++ b/services/agent-system/env.example @@ -3,6 +3,11 @@ TELEGRAM_BOT_TOKEN=123456789:ABCdefGHIjklMNOpqrsTUVwxyz # Comma-separated list of Telegram User IDs TELEGRAM_ALLOWED_USER_IDS=12345678,87654321 +# Local control-plane API (default is internal compose address) +CONTROL_PLANE_URL=http://webui:8080 +# Optional LLM fallback logic +ENABLE_LLM_FALLBACK=false +OPENCLAW_BASE_URL=http://openclaw.internal # Runtime Materializer Configuration REDIS_HOST=100.108.208.3 diff --git a/services/agent-system/telegram-bot/bot.py b/services/agent-system/telegram-bot/bot.py index d84c135..8f19b43 100644 --- a/services/agent-system/telegram-bot/bot.py +++ b/services/agent-system/telegram-bot/bot.py @@ -3,9 +3,11 @@ import json import time import asyncio import logging +import urllib.request +import urllib.error from pathlib import Path from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup -from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, CallbackQueryHandler +from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, CallbackQueryHandler, MessageHandler, filters # Setup logging logging.basicConfig( @@ -18,6 +20,39 @@ logger = logging.getLogger(__name__) TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_IDS = [int(i.strip()) for i in os.getenv("TELEGRAM_ALLOWED_USER_IDS", "").split(",") if i.strip()] ACTIONS_ROOT = Path(os.getenv("ACTIONS_ROOT", "/opt/homelab/actions")) +CONTROL_PLANE_URL = os.getenv("CONTROL_PLANE_URL", "http://webui:8080") +ENABLE_LLM_FALLBACK = os.getenv("ENABLE_LLM_FALLBACK", "false").lower() == "true" +OPENCLAW_BASE_URL = os.getenv("OPENCLAW_BASE_URL") + +async def fetch_api(path): + """Helper to fetch JSON from the Control Plane API.""" + url = f"{CONTROL_PLANE_URL.rstrip('/')}/{path.lstrip('/')}" + try: + def do_request(): + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=5) as response: + if response.status != 200: + return None + return json.loads(response.read().decode()) + return await asyncio.to_thread(do_request) + except Exception as e: + logger.error(f"Error fetching {url}: {e}") + return None + +async def post_api(path, data): + """Helper to POST JSON to the Control Plane API.""" + url = f"{CONTROL_PLANE_URL.rstrip('/')}/{path.lstrip('/')}" + try: + body = json.dumps(data).encode("utf-8") + def do_request(): + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("Content-Type", "application/json") + with urllib.request.urlopen(req, timeout=5) as response: + return response.status == 200 + return await asyncio.to_thread(do_request) + except Exception as e: + logger.error(f"Error posting to {url}: {e}") + return False class ApprovalBot: def __init__(self): @@ -59,7 +94,7 @@ class ApprovalBot: f"Node: `{data.get('node', 'unknown')}`\n" f"Risk: *{data.get('risk', 'unknown')}*\n" ) - + if "details" in data: details_str = json.dumps(data['details'], indent=2) if len(details_str) > 1000: @@ -96,16 +131,22 @@ class ApprovalBot: return await query.answer() - + cb_data = query.data if ":" not in cb_data: return - + action, action_id = cb_data.split(":", 1) target_status = "approved" if action == "approve" else "rejected" - success, msg = self.move_action(action_id, target_status, user_id, query.from_user.username or str(user_id)) - + # Use API for mutation if available, fallback to local disk move + success = await post_api("/action/mutate", {"id": action_id, "status": target_status}) + msg = "Success" if success else "API call failed" + + if not success: + # Fallback to direct disk manipulation (original behavior) + success, msg = self.move_action(action_id, target_status, user_id, query.from_user.username or str(user_id)) + if success: status_text = "āœ… Approved" if target_status == "approved" else "āŒ Rejected" await query.edit_message_text( @@ -131,11 +172,11 @@ class ApprovalBot: try: data = json.loads(source_path.read_text()) current_status = data.get("status", "pending") - + # Update data data["status"] = target_status data["updated_at"] = time.time() - + history = data.get("transition_history", []) history.append({ "from": current_status, @@ -158,16 +199,189 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Simple start command to help users find their ID.""" user = update.effective_user message = ( - f"Hello {user.first_name}!\n" + f"Hello {user.first_name}! šŸ¤–\n" f"Your Telegram User ID is: `{user.id}`\n\n" ) if user.id in ALLOWED_IDS: - message += "āœ… You are authorized to approve actions." + message += "āœ… You are authorized to manage the homelab.\n\n" + message += "Use /help to see available commands." else: - message += "āŒ You are NOT authorized. Add your ID to TELEGRAM_ALLOWED_USER_IDS." - + message += "āŒ You are NOT authorized. Add your ID to `TELEGRAM_ALLOWED_USER_IDS`." + await update.message.reply_text(message, parse_mode="Markdown") +async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + if update.effective_user.id not in ALLOWED_IDS: return + res = await fetch_api("/summary") + status = "āœ… Online" if res else "āŒ Unreachable" + message = ( + f"šŸ¤– *Telegram Bot Status*\n" + f"Control Plane API: {status}\n" + f"Target URL: `{CONTROL_PLANE_URL}`\n" + ) + await update.message.reply_text(message, parse_mode="Markdown") + +async def summary_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + if update.effective_user.id not in ALLOWED_IDS: return + data = await fetch_api("/summary") + if not data: + await update.message.reply_text("āŒ Failed to fetch summary from Control Plane.") + return + + msg = "šŸ“Š *System Summary*\n" + msg += f"Status: `{data.get('status', 'unknown')}`\n" + msg += f"Nodes: {data.get('node_count', 0)}\n" + msg += f"Services: {data.get('service_count', 0)}\n" + msg += f"Active Incidents: {data.get('active_incidents_count', 0)}\n" + if data.get('stale'): + msg += "\nāš ļø *Warning: Data is stale!*" + + await update.message.reply_text(msg, parse_mode="Markdown") + +async def nodes_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + if update.effective_user.id not in ALLOWED_IDS: return + nodes = await fetch_api("/nodes") + if nodes is None: + await update.message.reply_text("āŒ Failed to fetch nodes.") + return + + if not nodes: + await update.message.reply_text("No nodes discovered in the fleet.") + return + + msg = "šŸ–„ļø *Nodes Status*\n" + for node in nodes: + health_icon = "āœ…" if node.get('health') == 'nominal' else "āš ļø" if node.get('health') == 'degraded' else "āŒ" + msg += f"{health_icon} *{node.get('hostname')}*: `{node.get('status', 'unknown')}`\n" + msg += f" Last seen: {node.get('last_seen', 'N/A')}\n" + + await update.message.reply_text(msg, parse_mode="Markdown") + +async def services_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + if update.effective_user.id not in ALLOWED_IDS: return + services = await fetch_api("/services") + if services is None: + await update.message.reply_text("āŒ Failed to fetch services.") + return + + # Summarize by node + nodes = {} + for s in services: + node = s.get("node", "unknown") + if node not in nodes: nodes[node] = [] + nodes[node].append(s) + + msg = "āš™ļø *Services Summary*\n" + if not nodes: + msg += "No services discovered." + else: + for node, svc_list in sorted(nodes.items()): + nominal = len([s for s in svc_list if s.get("health") == "nominal"]) + msg += f"• *{node}*: {nominal}/{len(svc_list)} nominal\n" + + msg += "\nUse /unhealthy to see issues." + await update.message.reply_text(msg, parse_mode="Markdown") + +async def unhealthy_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + if update.effective_user.id not in ALLOWED_IDS: return + services = await fetch_api("/services") + nodes = await fetch_api("/nodes") + + msg = "āš ļø *Unhealthy Components*\n" + found = False + + if services: + for s in services: + health = s.get("health", "").lower() + if health != "nominal": + msg += f"• Service *{s.get('name')}* on *{s.get('node')}*: `{health}`\n" + found = True + + if nodes: + for n in nodes: + checks = n.get("checks", {}) + if isinstance(checks, str): + try: checks = json.loads(checks) + except: checks = {} + + docker = checks.get("docker", {}) + if docker.get("status") == "ok": + for c in docker.get("containers", []): + if c.get("state") != "running": + msg += f"• Container *{c.get('name')}* on *{n.get('hostname')}*: `{c.get('state')}`\n" + found = True + + if not found: + msg += "All systems nominal. āœ…" + + await update.message.reply_text(msg, parse_mode="Markdown") + +async def incidents_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + if update.effective_user.id not in ALLOWED_IDS: return + incidents = await fetch_api("/incidents") + if incidents is None: + await update.message.reply_text("āŒ Failed to fetch incidents.") + return + + active = [i for i in incidents if i.get("status") not in ("resolved", "closed")] + if not active: + await update.message.reply_text("No active incidents. āœ…") + return + + msg = "🚨 *Active Incidents*\n" + for inc in active: + severity = inc.get('severity', 'info').upper() + msg += f"• [{severity}] *{inc.get('type')}*: {inc.get('message')}\n" + + await update.message.reply_text(msg, parse_mode="Markdown") + +async def actions_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + if update.effective_user.id not in ALLOWED_IDS: return + actions = await fetch_api("/actions") + if actions is None: + await update.message.reply_text("āŒ Actions endpoint unavailable.") + return + + msg = "⚔ *Actions Summary*\n" + total = 0 + for status, act_list in actions.items(): + if act_list: + msg += f"• {status.capitalize()}: {len(act_list)}\n" + total += len(act_list) + + if total == 0: + msg = "No actions recorded." + + await update.message.reply_text(msg, parse_mode="Markdown") + +async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + msg = ( + "šŸ“– *Supported Commands*\n\n" + "/status - Check bot and API connectivity\n" + "/summary - System health overview\n" + "/nodes - List homelab nodes and their status\n" + "/services - Summary of services across nodes\n" + "/unhealthy - List all unhealthy components\n" + "/incidents - View active incidents\n" + "/actions - Summary of operator actions\n" + "/help - Show this help message\n\n" + "Free text will be handled by the guidance system." + ) + await update.message.reply_text(msg, parse_mode="Markdown") + +async def handle_fallback(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handles non-command messages.""" + if update.effective_user.id not in ALLOWED_IDS: return + + if ENABLE_LLM_FALLBACK and OPENCLAW_BASE_URL: + # Placeholder for OpenClaw LLM fallback + # In a real scenario, this would call the LLM API + logger.info(f"LLM fallback requested for: {update.message.text}") + + await update.message.reply_text( + "Use /summary, /nodes, /services, /unhealthy, /incidents, /actions." + ) + async def run_bot(): if not TOKEN: print("CRITICAL: TELEGRAM_BOT_TOKEN is not set. Telegram bot will not start.") @@ -176,21 +390,31 @@ async def run_bot(): return bot_logic = ApprovalBot() - + application = ApplicationBuilder().token(TOKEN).build() - + application.add_handler(CommandHandler("start", start_command)) + application.add_handler(CommandHandler("status", status_command)) + application.add_handler(CommandHandler("summary", summary_command)) + application.add_handler(CommandHandler("nodes", nodes_command)) + application.add_handler(CommandHandler("services", services_command)) + application.add_handler(CommandHandler("unhealthy", unhealthy_command)) + application.add_handler(CommandHandler("incidents", incidents_command)) + application.add_handler(CommandHandler("actions", actions_command)) + application.add_handler(CommandHandler("help", help_command)) + + application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_fallback)) application.add_handler(CallbackQueryHandler(bot_logic.handle_callback)) - + # Schedule the pending actions check job_queue = application.job_queue job_queue.run_repeating(bot_logic.check_pending_actions, interval=10, first=5) - + logger.info("Starting Telegram Approval Bot...") await application.initialize() await application.start() await application.updater.start_polling() - + # Run until the application is stopped stop_event = asyncio.Event() try: