diff --git a/services/agent-system/action-model.md b/services/agent-system/action-model.md new file mode 100644 index 0000000..0930b34 --- /dev/null +++ b/services/agent-system/action-model.md @@ -0,0 +1,45 @@ +### Action Approval Data Model + +Actions are JSON files stored in `/opt/homelab/actions/{status}/{action_id}.json`. + +#### Statuses +- `pending`: Waiting for operator approval. +- `approved`: Approved by operator, ready for execution. +- `rejected`: Rejected by operator, will not be executed. +- `running`: Currently being executed by an agent. +- `completed`: Successfully executed. +- `failed`: Execution failed. + +#### Schema +```json +{ + "action_id": "string", + "service": "string", + "node": "string", + "type": "deploy_service | restart_service | rollback | scale", + "risk": "nominal | guarded | critical", + "status": "pending | approved | rejected | ...", + "created_at": , + "updated_at": , + "details": { + "image": "string", + "reason": "string", + "diff": "string" + }, + "transition_history": [ + { + "from": "string | null", + "to": "string", + "timestamp": , + "by": "string (system | operator-tg-12345 | webui)" + } + ] +} +``` + +#### Workflow +1. A system component (e.g. `runtime-materializer` or a future analyzer) creates a file in `actions/pending/`. +2. `telegram-bot` detects the file, sends a message to allowed users. +3. Operator clicks "Approve" or "Reject". +4. `telegram-bot` moves the file to `actions/approved/` or `actions/rejected/` atomically, appending a transition to `transition_history`. +5. The responsible agent (e.g. `stability-agent` on the target node) picks up the `approved` action, moves it to `running`, executes it, and finally moves it to `completed` or `failed`. diff --git a/services/agent-system/deploy.sh b/services/agent-system/deploy.sh index 3d6c015..7175f37 100755 --- a/services/agent-system/deploy.sh +++ b/services/agent-system/deploy.sh @@ -8,7 +8,11 @@ echo ">>> Building and starting Agent System services..." docker compose up -d --build echo ">>> Services status:" -docker ps --filter "name=agent-system" +docker ps --filter "name=agent-system" --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}" + +if [ -z "$TELEGRAM_BOT_TOKEN" ]; then + echo "NOTE: TELEGRAM_BOT_TOKEN is not set. Telegram approval bot will be disabled." +fi echo ">>> Verifying API endpoints..." sleep 5 # Give it a moment to start diff --git a/services/agent-system/docker-compose.yml b/services/agent-system/docker-compose.yml index c2e13e4..7df87db 100644 --- a/services/agent-system/docker-compose.yml +++ b/services/agent-system/docker-compose.yml @@ -31,3 +31,14 @@ services: depends_on: - redis restart: unless-stopped + + telegram-bot: + build: ./telegram-bot + container_name: agent-system-telegram-bot + environment: + TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN} + TELEGRAM_ALLOWED_USER_IDS: ${TELEGRAM_ALLOWED_USER_IDS} + ACTIONS_ROOT: /opt/homelab/actions + volumes: + - /opt/homelab:/opt/homelab + restart: unless-stopped diff --git a/services/agent-system/env.example b/services/agent-system/env.example new file mode 100644 index 0000000..0a066ce --- /dev/null +++ b/services/agent-system/env.example @@ -0,0 +1,14 @@ +# Telegram Bot Configuration +# Get token from @BotFather +TELEGRAM_BOT_TOKEN=123456789:ABCdefGHIjklMNOpqrsTUVwxyz +# Comma-separated list of Telegram User IDs +TELEGRAM_ALLOWED_USER_IDS=12345678,87654321 + +# Runtime Materializer Configuration +REDIS_HOST=100.108.208.3 +REDIS_PORT=6379 + +# Paths +HOMELAB_ROOT=/opt/homelab +ACTIONS_ROOT=/opt/homelab/actions +WORLD_DIR=/opt/homelab/world diff --git a/services/agent-system/scripts/create-test-action.sh b/services/agent-system/scripts/create-test-action.sh new file mode 100755 index 0000000..3feac78 --- /dev/null +++ b/services/agent-system/scripts/create-test-action.sh @@ -0,0 +1,39 @@ +#!/bin/bash +# Script to create a test pending action for Telegram bot verification. + +ACTIONS_PENDING_DIR=${ACTIONS_ROOT:-/opt/homelab/actions}/pending +mkdir -p "$ACTIONS_PENDING_DIR" + +ACTION_ID="test-$(date +%s)" +FILE_PATH="$ACTIONS_PENDING_DIR/$ACTION_ID.json" + +TIMESTAMP=$(date +%s) + +cat < "$FILE_PATH" +{ + "action_id": "$ACTION_ID", + "service": "frigate", + "node": "chelsty", + "type": "deploy_service", + "risk": "guarded", + "status": "pending", + "created_at": $TIMESTAMP, + "updated_at": $TIMESTAMP, + "details": { + "image": "blakeblackshear/frigate:0.13.0", + "reason": "Security update for Frigate", + "diff": "image: blakeblackshear/frigate:0.12.0 -> 0.13.0" + }, + "transition_history": [ + { + "from": null, + "to": "pending", + "timestamp": $TIMESTAMP, + "by": "system-test" + } + ] +} +EOF + +echo "Test action created: $FILE_PATH" +echo "If the telegram-bot is running and configured, you should receive a notification." diff --git a/services/agent-system/telegram-bot/Dockerfile b/services/agent-system/telegram-bot/Dockerfile new file mode 100644 index 0000000..e6ffa0b --- /dev/null +++ b/services/agent-system/telegram-bot/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY bot.py . + +CMD ["python", "bot.py"] diff --git a/services/agent-system/telegram-bot/bot.py b/services/agent-system/telegram-bot/bot.py new file mode 100644 index 0000000..d84c135 --- /dev/null +++ b/services/agent-system/telegram-bot/bot.py @@ -0,0 +1,210 @@ +import os +import json +import time +import asyncio +import logging +from pathlib import Path +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, CallbackQueryHandler + +# Setup logging +logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + +# Configuration +TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") +ALLOWED_IDS = [int(i.strip()) for i in os.getenv("TELEGRAM_ALLOWED_USER_IDS", "").split(",") if i.strip()] +ACTIONS_ROOT = Path(os.getenv("ACTIONS_ROOT", "/opt/homelab/actions")) + +class ApprovalBot: + def __init__(self): + self.pending_dir = ACTIONS_ROOT / "pending" + self.approved_dir = ACTIONS_ROOT / "approved" + self.rejected_dir = ACTIONS_ROOT / "rejected" + # Track which action IDs we have already notified in this session to avoid spam + self.notified_actions = set() + + async def check_pending_actions(self, context: ContextTypes.DEFAULT_TYPE): + """Job that periodically checks for new pending action files.""" + if not self.pending_dir.exists(): + return + + try: + for action_file in self.pending_dir.glob("*.json"): + action_id = action_file.stem + if action_id in self.notified_actions: + continue + + try: + data = json.loads(action_file.read_text()) + # Only notify if it's truly pending + if data.get("status") == "pending": + await self.notify_users(context, action_id, data) + self.notified_actions.add(action_id) + except Exception as e: + logger.error(f"Error processing action file {action_file}: {e}") + except Exception as e: + logger.error(f"Error scanning pending directory: {e}") + + async def notify_users(self, context: ContextTypes.DEFAULT_TYPE, action_id: str, data: dict): + """Sends an approval request message to all allowed users.""" + message = ( + f"⚠️ *Pending Action*\n" + f"ID: `{action_id}`\n" + f"Type: `{data.get('type', 'unknown')}`\n" + f"Service: `{data.get('service', 'unknown')}`\n" + f"Node: `{data.get('node', 'unknown')}`\n" + f"Risk: *{data.get('risk', 'unknown')}*\n" + ) + + if "details" in data: + details_str = json.dumps(data['details'], indent=2) + if len(details_str) > 1000: + details_str = details_str[:1000] + "..." + message += f"\nDetails:\n```json\n{details_str}\n```" + + keyboard = [ + [ + InlineKeyboardButton("✅ Approve", callback_data=f"approve:{action_id}"), + InlineKeyboardButton("❌ Reject", callback_data=f"reject:{action_id}"), + ] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + for user_id in ALLOWED_IDS: + try: + await context.bot.send_message( + chat_id=user_id, + text=message, + parse_mode="Markdown", + reply_markup=reply_markup + ) + logger.info(f"Notified user {user_id} about action {action_id}") + except Exception as e: + logger.error(f"Failed to notify user {user_id}: {e}") + + async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handles button clicks for Approve/Reject.""" + query = update.callback_query + user_id = query.from_user.id + + if user_id not in ALLOWED_IDS: + await query.answer("Unauthorized", show_alert=True) + return + + await query.answer() + + cb_data = query.data + if ":" not in cb_data: + return + + action, action_id = cb_data.split(":", 1) + target_status = "approved" if action == "approve" else "rejected" + + success, msg = self.move_action(action_id, target_status, user_id, query.from_user.username or str(user_id)) + + if success: + status_text = "✅ Approved" if target_status == "approved" else "❌ Rejected" + await query.edit_message_text( + text=query.message.text + f"\n\n{status_text} by {query.from_user.first_name}", + parse_mode="Markdown" + ) + # Remove from notified list as it's no longer pending + if action_id in self.notified_actions: + self.notified_actions.remove(action_id) + else: + await query.message.reply_text(f"Failed to process action {action_id}: {msg}") + + def move_action(self, action_id, target_status, user_id, username): + """Moves action file and updates its status and history.""" + source_path = self.pending_dir / f"{action_id}.json" + if not source_path.exists(): + return False, "Action file no longer exists in pending." + + target_dir = self.approved_dir if target_status == "approved" else self.rejected_dir + target_dir.mkdir(parents=True, exist_ok=True) + target_path = target_dir / f"{action_id}.json" + + try: + data = json.loads(source_path.read_text()) + current_status = data.get("status", "pending") + + # Update data + data["status"] = target_status + data["updated_at"] = time.time() + + history = data.get("transition_history", []) + history.append({ + "from": current_status, + "to": target_status, + "timestamp": time.time(), + "by": f"tg:{username}" + }) + data["transition_history"] = history + + # Atomic move: write to new location, then delete old + target_path.write_text(json.dumps(data, indent=2)) + source_path.unlink() + logger.info(f"Action {action_id} moved from {current_status} to {target_status} by {username}") + return True, "Success" + except Exception as e: + logger.error(f"Error moving action file: {e}") + return False, str(e) + +async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Simple start command to help users find their ID.""" + user = update.effective_user + message = ( + f"Hello {user.first_name}!\n" + f"Your Telegram User ID is: `{user.id}`\n\n" + ) + if user.id in ALLOWED_IDS: + message += "✅ You are authorized to approve actions." + else: + message += "❌ You are NOT authorized. Add your ID to TELEGRAM_ALLOWED_USER_IDS." + + await update.message.reply_text(message, parse_mode="Markdown") + +async def run_bot(): + if not TOKEN: + print("CRITICAL: TELEGRAM_BOT_TOKEN is not set. Telegram bot will not start.") + # Keep process alive to not crash compose if not desired, but here we just exit + # Requirement says: "do not fail if Telegram token is absent, but telegram-bot should be disabled or exit cleanly" + return + + bot_logic = ApprovalBot() + + application = ApplicationBuilder().token(TOKEN).build() + + application.add_handler(CommandHandler("start", start_command)) + application.add_handler(CallbackQueryHandler(bot_logic.handle_callback)) + + # Schedule the pending actions check + job_queue = application.job_queue + job_queue.run_repeating(bot_logic.check_pending_actions, interval=10, first=5) + + logger.info("Starting Telegram Approval Bot...") + await application.initialize() + await application.start() + await application.updater.start_polling() + + # Run until the application is stopped + stop_event = asyncio.Event() + try: + await stop_event.wait() + except (KeyboardInterrupt, SystemExit): + logger.info("Stopping bot...") + finally: + await application.stop() + await application.shutdown() + +if __name__ == "__main__": + try: + asyncio.run(run_bot()) + except KeyboardInterrupt: + pass + except Exception as e: + logger.error(f"Fatal error: {e}") diff --git a/services/agent-system/telegram-bot/requirements.txt b/services/agent-system/telegram-bot/requirements.txt new file mode 100644 index 0000000..e960954 --- /dev/null +++ b/services/agent-system/telegram-bot/requirements.txt @@ -0,0 +1 @@ +python-telegram-bot==21.1.1