import os import json import time import asyncio import logging import urllib.request import urllib.error from pathlib import Path from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, CallbackQueryHandler, MessageHandler, filters # Setup logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Configuration TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_IDS = [int(i.strip()) for i in os.getenv("TELEGRAM_ALLOWED_USER_IDS", "").split(",") if i.strip()] ACTIONS_ROOT = Path(os.getenv("ACTIONS_ROOT", "/opt/homelab/actions")) CONTROL_PLANE_URL = os.getenv("CONTROL_PLANE_URL", "http://webui:8080") ENABLE_LLM_FALLBACK = os.getenv("ENABLE_LLM_FALLBACK", "false").lower() == "true" OPENCLAW_BASE_URL = os.getenv("OPENCLAW_BASE_URL") async def fetch_api(path): """Helper to fetch JSON from the Control Plane API.""" url = f"{CONTROL_PLANE_URL.rstrip('/')}/{path.lstrip('/')}" try: def do_request(): req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=5) as response: if response.status != 200: return None return json.loads(response.read().decode()) return await asyncio.to_thread(do_request) except Exception as e: logger.error(f"Error fetching {url}: {e}") return None async def post_api(path, data): """Helper to POST JSON to the Control Plane API.""" url = f"{CONTROL_PLANE_URL.rstrip('/')}/{path.lstrip('/')}" try: body = json.dumps(data).encode("utf-8") def do_request(): req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=5) as response: return response.status == 200 return await asyncio.to_thread(do_request) except Exception as e: logger.error(f"Error posting to {url}: {e}") return False class ApprovalBot: def __init__(self): self.pending_dir = ACTIONS_ROOT / "pending" self.approved_dir = ACTIONS_ROOT / "approved" self.rejected_dir = ACTIONS_ROOT / "rejected" # Track which action IDs we have already notified in this session to avoid spam self.notified_actions = set() async def check_pending_actions(self, context: ContextTypes.DEFAULT_TYPE): """Job that periodically checks for new pending action files.""" if not self.pending_dir.exists(): return try: for action_file in self.pending_dir.glob("*.json"): action_id = action_file.stem if action_id in self.notified_actions: continue try: data = json.loads(action_file.read_text()) # Only notify if it's truly pending if data.get("status") == "pending": await self.notify_users(context, action_id, data) self.notified_actions.add(action_id) except Exception as e: logger.error(f"Error processing action file {action_file}: {e}") except Exception as e: logger.error(f"Error scanning pending directory: {e}") async def notify_users(self, context: ContextTypes.DEFAULT_TYPE, action_id: str, data: dict): """Sends an approval request message to all allowed users.""" message = ( f"⚠️ *Pending Action*\n" f"ID: `{action_id}`\n" f"Type: `{data.get('type', 'unknown')}`\n" f"Service: `{data.get('service', 'unknown')}`\n" f"Node: `{data.get('node', 'unknown')}`\n" f"Risk: *{data.get('risk', 'unknown')}*\n" ) if "details" in data: details_str = json.dumps(data['details'], indent=2) if len(details_str) > 1000: details_str = details_str[:1000] + "..." message += f"\nDetails:\n```json\n{details_str}\n```" keyboard = [ [ InlineKeyboardButton("✅ Approve", callback_data=f"approve:{action_id}"), InlineKeyboardButton("❌ Reject", callback_data=f"reject:{action_id}"), ] ] reply_markup = InlineKeyboardMarkup(keyboard) for user_id in ALLOWED_IDS: try: await context.bot.send_message( chat_id=user_id, text=message, parse_mode="Markdown", reply_markup=reply_markup ) logger.info(f"Notified user {user_id} about action {action_id}") except Exception as e: logger.error(f"Failed to notify user {user_id}: {e}") async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handles button clicks for Approve/Reject.""" query = update.callback_query user_id = query.from_user.id if user_id not in ALLOWED_IDS: await query.answer("Unauthorized", show_alert=True) return await query.answer() cb_data = query.data if ":" not in cb_data: return action, action_id = cb_data.split(":", 1) target_status = "approved" if action == "approve" else "rejected" # Use API for mutation if available, fallback to local disk move success = await post_api("/action/mutate", {"id": action_id, "status": target_status}) msg = "Success" if success else "API call failed" if not success: # Fallback to direct disk manipulation (original behavior) success, msg = self.move_action(action_id, target_status, user_id, query.from_user.username or str(user_id)) if success: status_text = "✅ Approved" if target_status == "approved" else "❌ Rejected" await query.edit_message_text( text=query.message.text + f"\n\n{status_text} by {query.from_user.first_name}", parse_mode="Markdown" ) # Remove from notified list as it's no longer pending if action_id in self.notified_actions: self.notified_actions.remove(action_id) else: await query.message.reply_text(f"Failed to process action {action_id}: {msg}") def move_action(self, action_id, target_status, user_id, username): """Moves action file and updates its status and history.""" source_path = self.pending_dir / f"{action_id}.json" if not source_path.exists(): return False, "Action file no longer exists in pending." target_dir = self.approved_dir if target_status == "approved" else self.rejected_dir target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / f"{action_id}.json" try: data = json.loads(source_path.read_text()) current_status = data.get("status", "pending") # Update data data["status"] = target_status data["updated_at"] = time.time() history = data.get("transition_history", []) history.append({ "from": current_status, "to": target_status, "timestamp": time.time(), "by": f"tg:{username}" }) data["transition_history"] = history # Atomic move: write to new location, then delete old target_path.write_text(json.dumps(data, indent=2)) source_path.unlink() logger.info(f"Action {action_id} moved from {current_status} to {target_status} by {username}") return True, "Success" except Exception as e: logger.error(f"Error moving action file: {e}") return False, str(e) async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Simple start command to help users find their ID.""" user = update.effective_user message = ( f"Hello {user.first_name}! 🤖\n" f"Your Telegram User ID is: `{user.id}`\n\n" ) if user.id in ALLOWED_IDS: message += "✅ You are authorized to manage the homelab.\n\n" message += "Use /help to see available commands." else: message += "❌ You are NOT authorized. Add your ID to `TELEGRAM_ALLOWED_USER_IDS`." await update.message.reply_text(message, parse_mode="Markdown") async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return res = await fetch_api("/summary") status = "✅ Online" if res else "❌ Unreachable" message = ( f"🤖 *Telegram Bot Status*\n" f"Control Plane API: {status}\n" f"Target URL: `{CONTROL_PLANE_URL}`\n" ) await update.message.reply_text(message, parse_mode="Markdown") async def summary_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return data = await fetch_api("/summary") if not data: await update.message.reply_text("❌ Failed to fetch summary from Control Plane.") return msg = "📊 *System Summary*\n" msg += f"Status: `{data.get('status', 'unknown')}`\n" msg += f"Nodes: {data.get('node_count', 0)}\n" msg += f"Services: {data.get('service_count', 0)}\n" msg += f"Active Incidents: {data.get('active_incidents_count', 0)}\n" if data.get('stale'): msg += "\n⚠️ *Warning: Data is stale!*" await update.message.reply_text(msg, parse_mode="Markdown") async def nodes_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return nodes = await fetch_api("/nodes") if nodes is None: await update.message.reply_text("❌ Failed to fetch nodes.") return if not nodes: await update.message.reply_text("No nodes discovered in the fleet.") return msg = "🖥️ *Nodes Status*\n" for node in nodes: health_icon = "✅" if node.get('health') == 'nominal' else "⚠️" if node.get('health') == 'degraded' else "❌" msg += f"{health_icon} *{node.get('hostname')}*: `{node.get('status', 'unknown')}`\n" msg += f" Last seen: {node.get('last_seen', 'N/A')}\n" await update.message.reply_text(msg, parse_mode="Markdown") async def services_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return services = await fetch_api("/services") if services is None: await update.message.reply_text("❌ Failed to fetch services.") return # Summarize by node nodes = {} for s in services: node = s.get("node", "unknown") if node not in nodes: nodes[node] = [] nodes[node].append(s) msg = "⚙️ *Services Summary*\n" if not nodes: msg += "No services discovered." else: for node, svc_list in sorted(nodes.items()): nominal = len([s for s in svc_list if s.get("health") == "nominal"]) msg += f"• *{node}*: {nominal}/{len(svc_list)} nominal\n" msg += "\nUse /unhealthy to see issues." await update.message.reply_text(msg, parse_mode="Markdown") async def unhealthy_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return services = await fetch_api("/services") nodes = await fetch_api("/nodes") msg = "⚠️ *Unhealthy Components*\n" found = False if services: for s in services: health = s.get("health", "").lower() if health != "nominal": msg += f"• Service *{s.get('name')}* on *{s.get('node')}*: `{health}`\n" found = True if nodes: for n in nodes: checks = n.get("checks", {}) if isinstance(checks, str): try: checks = json.loads(checks) except: checks = {} docker = checks.get("docker", {}) if docker.get("status") == "ok": for c in docker.get("containers", []): if c.get("state") != "running": msg += f"• Container *{c.get('name')}* on *{n.get('hostname')}*: `{c.get('state')}`\n" found = True if not found: msg += "All systems nominal. ✅" await update.message.reply_text(msg, parse_mode="Markdown") async def incidents_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return incidents = await fetch_api("/incidents") if incidents is None: await update.message.reply_text("❌ Failed to fetch incidents.") return active = [i for i in incidents if i.get("status") not in ("resolved", "closed")] if not active: await update.message.reply_text("No active incidents. ✅") return msg = "🚨 *Active Incidents*\n" for inc in active: severity = inc.get('severity', 'info').upper() msg += f"• [{severity}] *{inc.get('type')}*: {inc.get('message')}\n" await update.message.reply_text(msg, parse_mode="Markdown") async def actions_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return actions = await fetch_api("/actions") if actions is None: await update.message.reply_text("❌ Actions endpoint unavailable.") return msg = "⚡ *Actions Summary*\n" total = 0 for status, act_list in actions.items(): if act_list: msg += f"• {status.capitalize()}: {len(act_list)}\n" total += len(act_list) if total == 0: msg = "No actions recorded." await update.message.reply_text(msg, parse_mode="Markdown") async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): msg = ( "📖 *Supported Commands*\n\n" "/status - Check bot and API connectivity\n" "/summary - System health overview\n" "/nodes - List homelab nodes and their status\n" "/services - Summary of services across nodes\n" "/unhealthy - List all unhealthy components\n" "/incidents - View active incidents\n" "/actions - Summary of operator actions\n" "/help - Show this help message\n\n" "Free text will be handled by the guidance system." ) await update.message.reply_text(msg, parse_mode="Markdown") async def handle_fallback(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handles non-command messages.""" if update.effective_user.id not in ALLOWED_IDS: return if ENABLE_LLM_FALLBACK and OPENCLAW_BASE_URL: # Placeholder for OpenClaw LLM fallback # In a real scenario, this would call the LLM API logger.info(f"LLM fallback requested for: {update.message.text}") await update.message.reply_text( "Use /summary, /nodes, /services, /unhealthy, /incidents, /actions." ) async def run_bot(): if not TOKEN: print("CRITICAL: TELEGRAM_BOT_TOKEN is not set. Telegram bot will not start.") # Keep process alive to not crash compose if not desired, but here we just exit # Requirement says: "do not fail if Telegram token is absent, but telegram-bot should be disabled or exit cleanly" return bot_logic = ApprovalBot() application = ApplicationBuilder().token(TOKEN).build() application.add_handler(CommandHandler("start", start_command)) application.add_handler(CommandHandler("status", status_command)) application.add_handler(CommandHandler("summary", summary_command)) application.add_handler(CommandHandler("nodes", nodes_command)) application.add_handler(CommandHandler("services", services_command)) application.add_handler(CommandHandler("unhealthy", unhealthy_command)) application.add_handler(CommandHandler("incidents", incidents_command)) application.add_handler(CommandHandler("actions", actions_command)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_fallback)) application.add_handler(CallbackQueryHandler(bot_logic.handle_callback)) # Schedule the pending actions check job_queue = application.job_queue job_queue.run_repeating(bot_logic.check_pending_actions, interval=10, first=5) logger.info("Starting Telegram Approval Bot...") await application.initialize() await application.start() await application.updater.start_polling() # Run until the application is stopped stop_event = asyncio.Event() try: await stop_event.wait() except (KeyboardInterrupt, SystemExit): logger.info("Stopping bot...") finally: await application.stop() await application.shutdown() if __name__ == "__main__": try: asyncio.run(run_bot()) except KeyboardInterrupt: pass except Exception as e: logger.error(f"Fatal error: {e}")