import os import json import time import asyncio import logging from pathlib import Path from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, CallbackQueryHandler # Setup logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Configuration TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_IDS = [int(i.strip()) for i in os.getenv("TELEGRAM_ALLOWED_USER_IDS", "").split(",") if i.strip()] ACTIONS_ROOT = Path(os.getenv("ACTIONS_ROOT", "/opt/homelab/actions")) class ApprovalBot: def __init__(self): self.pending_dir = ACTIONS_ROOT / "pending" self.approved_dir = ACTIONS_ROOT / "approved" self.rejected_dir = ACTIONS_ROOT / "rejected" # Track which action IDs we have already notified in this session to avoid spam self.notified_actions = set() async def check_pending_actions(self, context: ContextTypes.DEFAULT_TYPE): """Job that periodically checks for new pending action files.""" if not self.pending_dir.exists(): return try: for action_file in self.pending_dir.glob("*.json"): action_id = action_file.stem if action_id in self.notified_actions: continue try: data = json.loads(action_file.read_text()) # Only notify if it's truly pending if data.get("status") == "pending": await self.notify_users(context, action_id, data) self.notified_actions.add(action_id) except Exception as e: logger.error(f"Error processing action file {action_file}: {e}") except Exception as e: logger.error(f"Error scanning pending directory: {e}") async def notify_users(self, context: ContextTypes.DEFAULT_TYPE, action_id: str, data: dict): """Sends an approval request message to all allowed users.""" message = ( f"⚠️ *Pending Action*\n" f"ID: `{action_id}`\n" f"Type: `{data.get('type', 'unknown')}`\n" f"Service: `{data.get('service', 'unknown')}`\n" f"Node: `{data.get('node', 'unknown')}`\n" f"Risk: *{data.get('risk', 'unknown')}*\n" ) if "details" in data: details_str = json.dumps(data['details'], indent=2) if len(details_str) > 1000: details_str = details_str[:1000] + "..." message += f"\nDetails:\n```json\n{details_str}\n```" keyboard = [ [ InlineKeyboardButton("✅ Approve", callback_data=f"approve:{action_id}"), InlineKeyboardButton("❌ Reject", callback_data=f"reject:{action_id}"), ] ] reply_markup = InlineKeyboardMarkup(keyboard) for user_id in ALLOWED_IDS: try: await context.bot.send_message( chat_id=user_id, text=message, parse_mode="Markdown", reply_markup=reply_markup ) logger.info(f"Notified user {user_id} about action {action_id}") except Exception as e: logger.error(f"Failed to notify user {user_id}: {e}") async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handles button clicks for Approve/Reject.""" query = update.callback_query user_id = query.from_user.id if user_id not in ALLOWED_IDS: await query.answer("Unauthorized", show_alert=True) return await query.answer() cb_data = query.data if ":" not in cb_data: return action, action_id = cb_data.split(":", 1) target_status = "approved" if action == "approve" else "rejected" success, msg = self.move_action(action_id, target_status, user_id, query.from_user.username or str(user_id)) if success: status_text = "✅ Approved" if target_status == "approved" else "❌ Rejected" await query.edit_message_text( text=query.message.text + f"\n\n{status_text} by {query.from_user.first_name}", parse_mode="Markdown" ) # Remove from notified list as it's no longer pending if action_id in self.notified_actions: self.notified_actions.remove(action_id) else: await query.message.reply_text(f"Failed to process action {action_id}: {msg}") def move_action(self, action_id, target_status, user_id, username): """Moves action file and updates its status and history.""" source_path = self.pending_dir / f"{action_id}.json" if not source_path.exists(): return False, "Action file no longer exists in pending." target_dir = self.approved_dir if target_status == "approved" else self.rejected_dir target_dir.mkdir(parents=True, exist_ok=True) target_path = target_dir / f"{action_id}.json" try: data = json.loads(source_path.read_text()) current_status = data.get("status", "pending") # Update data data["status"] = target_status data["updated_at"] = time.time() history = data.get("transition_history", []) history.append({ "from": current_status, "to": target_status, "timestamp": time.time(), "by": f"tg:{username}" }) data["transition_history"] = history # Atomic move: write to new location, then delete old target_path.write_text(json.dumps(data, indent=2)) source_path.unlink() logger.info(f"Action {action_id} moved from {current_status} to {target_status} by {username}") return True, "Success" except Exception as e: logger.error(f"Error moving action file: {e}") return False, str(e) async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Simple start command to help users find their ID.""" user = update.effective_user message = ( f"Hello {user.first_name}!\n" f"Your Telegram User ID is: `{user.id}`\n\n" ) if user.id in ALLOWED_IDS: message += "✅ You are authorized to approve actions." else: message += "❌ You are NOT authorized. Add your ID to TELEGRAM_ALLOWED_USER_IDS." await update.message.reply_text(message, parse_mode="Markdown") async def run_bot(): if not TOKEN: print("CRITICAL: TELEGRAM_BOT_TOKEN is not set. Telegram bot will not start.") # Keep process alive to not crash compose if not desired, but here we just exit # Requirement says: "do not fail if Telegram token is absent, but telegram-bot should be disabled or exit cleanly" return bot_logic = ApprovalBot() application = ApplicationBuilder().token(TOKEN).build() application.add_handler(CommandHandler("start", start_command)) application.add_handler(CallbackQueryHandler(bot_logic.handle_callback)) # Schedule the pending actions check job_queue = application.job_queue job_queue.run_repeating(bot_logic.check_pending_actions, interval=10, first=5) logger.info("Starting Telegram Approval Bot...") await application.initialize() await application.start() await application.updater.start_polling() # Run until the application is stopped stop_event = asyncio.Event() try: await stop_event.wait() except (KeyboardInterrupt, SystemExit): logger.info("Stopping bot...") finally: await application.stop() await application.shutdown() if __name__ == "__main__": try: asyncio.run(run_bot()) except KeyboardInterrupt: pass except Exception as e: logger.error(f"Fatal error: {e}")