homelab-codex-ws/services/agent-system/telegram-bot/bot.py

211 lines
8.1 KiB
Python
Raw Normal View History

import os
import json
import time
import asyncio
import logging
from pathlib import Path
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, CallbackQueryHandler
# Setup logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Configuration
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
ALLOWED_IDS = [int(i.strip()) for i in os.getenv("TELEGRAM_ALLOWED_USER_IDS", "").split(",") if i.strip()]
ACTIONS_ROOT = Path(os.getenv("ACTIONS_ROOT", "/opt/homelab/actions"))
class ApprovalBot:
def __init__(self):
self.pending_dir = ACTIONS_ROOT / "pending"
self.approved_dir = ACTIONS_ROOT / "approved"
self.rejected_dir = ACTIONS_ROOT / "rejected"
# Track which action IDs we have already notified in this session to avoid spam
self.notified_actions = set()
async def check_pending_actions(self, context: ContextTypes.DEFAULT_TYPE):
"""Job that periodically checks for new pending action files."""
if not self.pending_dir.exists():
return
try:
for action_file in self.pending_dir.glob("*.json"):
action_id = action_file.stem
if action_id in self.notified_actions:
continue
try:
data = json.loads(action_file.read_text())
# Only notify if it's truly pending
if data.get("status") == "pending":
await self.notify_users(context, action_id, data)
self.notified_actions.add(action_id)
except Exception as e:
logger.error(f"Error processing action file {action_file}: {e}")
except Exception as e:
logger.error(f"Error scanning pending directory: {e}")
async def notify_users(self, context: ContextTypes.DEFAULT_TYPE, action_id: str, data: dict):
"""Sends an approval request message to all allowed users."""
message = (
f"⚠️ *Pending Action*\n"
f"ID: `{action_id}`\n"
f"Type: `{data.get('type', 'unknown')}`\n"
f"Service: `{data.get('service', 'unknown')}`\n"
f"Node: `{data.get('node', 'unknown')}`\n"
f"Risk: *{data.get('risk', 'unknown')}*\n"
)
if "details" in data:
details_str = json.dumps(data['details'], indent=2)
if len(details_str) > 1000:
details_str = details_str[:1000] + "..."
message += f"\nDetails:\n```json\n{details_str}\n```"
keyboard = [
[
InlineKeyboardButton("✅ Approve", callback_data=f"approve:{action_id}"),
InlineKeyboardButton("❌ Reject", callback_data=f"reject:{action_id}"),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
for user_id in ALLOWED_IDS:
try:
await context.bot.send_message(
chat_id=user_id,
text=message,
parse_mode="Markdown",
reply_markup=reply_markup
)
logger.info(f"Notified user {user_id} about action {action_id}")
except Exception as e:
logger.error(f"Failed to notify user {user_id}: {e}")
async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handles button clicks for Approve/Reject."""
query = update.callback_query
user_id = query.from_user.id
if user_id not in ALLOWED_IDS:
await query.answer("Unauthorized", show_alert=True)
return
await query.answer()
cb_data = query.data
if ":" not in cb_data:
return
action, action_id = cb_data.split(":", 1)
target_status = "approved" if action == "approve" else "rejected"
success, msg = self.move_action(action_id, target_status, user_id, query.from_user.username or str(user_id))
if success:
status_text = "✅ Approved" if target_status == "approved" else "❌ Rejected"
await query.edit_message_text(
text=query.message.text + f"\n\n{status_text} by {query.from_user.first_name}",
parse_mode="Markdown"
)
# Remove from notified list as it's no longer pending
if action_id in self.notified_actions:
self.notified_actions.remove(action_id)
else:
await query.message.reply_text(f"Failed to process action {action_id}: {msg}")
def move_action(self, action_id, target_status, user_id, username):
"""Moves action file and updates its status and history."""
source_path = self.pending_dir / f"{action_id}.json"
if not source_path.exists():
return False, "Action file no longer exists in pending."
target_dir = self.approved_dir if target_status == "approved" else self.rejected_dir
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / f"{action_id}.json"
try:
data = json.loads(source_path.read_text())
current_status = data.get("status", "pending")
# Update data
data["status"] = target_status
data["updated_at"] = time.time()
history = data.get("transition_history", [])
history.append({
"from": current_status,
"to": target_status,
"timestamp": time.time(),
"by": f"tg:{username}"
})
data["transition_history"] = history
# Atomic move: write to new location, then delete old
target_path.write_text(json.dumps(data, indent=2))
source_path.unlink()
logger.info(f"Action {action_id} moved from {current_status} to {target_status} by {username}")
return True, "Success"
except Exception as e:
logger.error(f"Error moving action file: {e}")
return False, str(e)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Simple start command to help users find their ID."""
user = update.effective_user
message = (
f"Hello {user.first_name}!\n"
f"Your Telegram User ID is: `{user.id}`\n\n"
)
if user.id in ALLOWED_IDS:
message += "✅ You are authorized to approve actions."
else:
message += "❌ You are NOT authorized. Add your ID to TELEGRAM_ALLOWED_USER_IDS."
await update.message.reply_text(message, parse_mode="Markdown")
async def run_bot():
if not TOKEN:
print("CRITICAL: TELEGRAM_BOT_TOKEN is not set. Telegram bot will not start.")
# Keep process alive to not crash compose if not desired, but here we just exit
# Requirement says: "do not fail if Telegram token is absent, but telegram-bot should be disabled or exit cleanly"
return
bot_logic = ApprovalBot()
application = ApplicationBuilder().token(TOKEN).build()
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CallbackQueryHandler(bot_logic.handle_callback))
# Schedule the pending actions check
job_queue = application.job_queue
job_queue.run_repeating(bot_logic.check_pending_actions, interval=10, first=5)
logger.info("Starting Telegram Approval Bot...")
await application.initialize()
await application.start()
await application.updater.start_polling()
# Run until the application is stopped
stop_event = asyncio.Event()
try:
await stop_event.wait()
except (KeyboardInterrupt, SystemExit):
logger.info("Stopping bot...")
finally:
await application.stop()
await application.shutdown()
if __name__ == "__main__":
try:
asyncio.run(run_bot())
except KeyboardInterrupt:
pass
except Exception as e:
logger.error(f"Fatal error: {e}")