- read risk_level with risk fallback (was: risk only → "unknown" for all actions written by supervisor which uses risk_level key) - include description field in alert format (was: alert_only payloads' substance was invisible — description carried the full message) - extract _format_pending_action() pure helper to enable unit testing without a live Telegram connection - 8 tests: risk_level present, risk fallback, both absent, description shown/absent, truncation, full HA alert_only shape, no-description no-crash - flagged during Phase 5 review of ha-diag-agent supervisor routing Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
455 lines
18 KiB
Python
455 lines
18 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import asyncio
|
|
import logging
|
|
import urllib.request
|
|
import urllib.error
|
|
from pathlib import Path
|
|
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
|
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, CallbackQueryHandler, MessageHandler, filters
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
level=logging.INFO
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
|
ALLOWED_IDS = [int(i.strip()) for i in os.getenv("TELEGRAM_ALLOWED_USER_IDS", "").split(",") if i.strip()]
|
|
ACTIONS_ROOT = Path(os.getenv("ACTIONS_ROOT", "/opt/homelab/actions"))
|
|
CONTROL_PLANE_URL = os.getenv("CONTROL_PLANE_URL", "http://webui:8080")
|
|
ENABLE_LLM_FALLBACK = os.getenv("ENABLE_LLM_FALLBACK", "false").lower() == "true"
|
|
OPENCLAW_BASE_URL = os.getenv("OPENCLAW_BASE_URL")
|
|
|
|
async def fetch_api(path):
|
|
"""Helper to fetch JSON from the Control Plane API."""
|
|
url = f"{CONTROL_PLANE_URL.rstrip('/')}/{path.lstrip('/')}"
|
|
try:
|
|
def do_request():
|
|
req = urllib.request.Request(url)
|
|
with urllib.request.urlopen(req, timeout=5) as response:
|
|
if response.status != 200:
|
|
return None
|
|
return json.loads(response.read().decode())
|
|
return await asyncio.to_thread(do_request)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching {url}: {e}")
|
|
return None
|
|
|
|
async def post_api(path, data):
|
|
"""Helper to POST JSON to the Control Plane API."""
|
|
url = f"{CONTROL_PLANE_URL.rstrip('/')}/{path.lstrip('/')}"
|
|
try:
|
|
body = json.dumps(data).encode("utf-8")
|
|
def do_request():
|
|
req = urllib.request.Request(url, data=body, method="POST")
|
|
req.add_header("Content-Type", "application/json")
|
|
with urllib.request.urlopen(req, timeout=5) as response:
|
|
return response.status == 200
|
|
return await asyncio.to_thread(do_request)
|
|
except Exception as e:
|
|
logger.error(f"Error posting to {url}: {e}")
|
|
return False
|
|
|
|
def _format_pending_action(action_id: str, data: dict) -> str:
|
|
"""Build the Telegram Markdown message for a pending action notification.
|
|
|
|
Extracted so it can be unit-tested without a live Telegram connection.
|
|
"""
|
|
# Supervisor writes risk_level; action-model.md legacy schema used risk.
|
|
risk = data.get("risk_level") or data.get("risk", "unknown")
|
|
message = (
|
|
f"⚠️ *Pending Action*\n"
|
|
f"ID: `{action_id}`\n"
|
|
f"Type: `{data.get('type', 'unknown')}`\n"
|
|
f"Service: `{data.get('service', 'unknown')}`\n"
|
|
f"Node: `{data.get('node', 'unknown')}`\n"
|
|
f"Risk: *{risk}*\n"
|
|
)
|
|
# description carries the human-readable substance of the action (required for
|
|
# alert_only actions where it is the entire operator-visible message).
|
|
description = data.get("description", "")
|
|
if description:
|
|
truncated = description[:300] + ("..." if len(description) > 300 else "")
|
|
message += f"Description: `{truncated}`\n"
|
|
# Legacy details block (old action-model.md schema) — kept for backwards compat.
|
|
if "details" in data:
|
|
details_str = json.dumps(data["details"], indent=2)
|
|
if len(details_str) > 1000:
|
|
details_str = details_str[:1000] + "..."
|
|
message += f"\nDetails:\n```json\n{details_str}\n```"
|
|
return message
|
|
|
|
|
|
class ApprovalBot:
|
|
def __init__(self):
|
|
self.pending_dir = ACTIONS_ROOT / "pending"
|
|
self.approved_dir = ACTIONS_ROOT / "approved"
|
|
self.rejected_dir = ACTIONS_ROOT / "rejected"
|
|
# Track which action IDs we have already notified in this session to avoid spam
|
|
self.notified_actions = set()
|
|
|
|
async def check_pending_actions(self, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Job that periodically checks for new pending action files."""
|
|
if not self.pending_dir.exists():
|
|
return
|
|
|
|
try:
|
|
for action_file in self.pending_dir.glob("*.json"):
|
|
action_id = action_file.stem
|
|
if action_id in self.notified_actions:
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(action_file.read_text())
|
|
# Only notify if it's truly pending
|
|
if data.get("status") == "pending":
|
|
await self.notify_users(context, action_id, data)
|
|
self.notified_actions.add(action_id)
|
|
except Exception as e:
|
|
logger.error(f"Error processing action file {action_file}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error scanning pending directory: {e}")
|
|
|
|
async def notify_users(self, context: ContextTypes.DEFAULT_TYPE, action_id: str, data: dict):
|
|
"""Sends an approval request message to all allowed users."""
|
|
message = _format_pending_action(action_id, data)
|
|
|
|
keyboard = [
|
|
[
|
|
InlineKeyboardButton("✅ Approve", callback_data=f"approve:{action_id}"),
|
|
InlineKeyboardButton("❌ Reject", callback_data=f"reject:{action_id}"),
|
|
]
|
|
]
|
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
|
|
|
for user_id in ALLOWED_IDS:
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=user_id,
|
|
text=message,
|
|
parse_mode="Markdown",
|
|
reply_markup=reply_markup
|
|
)
|
|
logger.info(f"Notified user {user_id} about action {action_id}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to notify user {user_id}: {e}")
|
|
|
|
async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handles button clicks for Approve/Reject."""
|
|
query = update.callback_query
|
|
user_id = query.from_user.id
|
|
|
|
if user_id not in ALLOWED_IDS:
|
|
await query.answer("Unauthorized", show_alert=True)
|
|
return
|
|
|
|
await query.answer()
|
|
|
|
cb_data = query.data
|
|
if ":" not in cb_data:
|
|
return
|
|
|
|
action, action_id = cb_data.split(":", 1)
|
|
target_status = "approved" if action == "approve" else "rejected"
|
|
|
|
# Use API for mutation if available, fallback to local disk move
|
|
success = await post_api("/action/mutate", {"id": action_id, "status": target_status})
|
|
msg = "Success" if success else "API call failed"
|
|
|
|
if not success:
|
|
# Fallback to direct disk manipulation (original behavior)
|
|
success, msg = self.move_action(action_id, target_status, user_id, query.from_user.username or str(user_id))
|
|
|
|
if success:
|
|
status_text = "✅ Approved" if target_status == "approved" else "❌ Rejected"
|
|
await query.edit_message_text(
|
|
text=query.message.text + f"\n\n{status_text} by {query.from_user.first_name}",
|
|
parse_mode="Markdown"
|
|
)
|
|
# Remove from notified list as it's no longer pending
|
|
if action_id in self.notified_actions:
|
|
self.notified_actions.remove(action_id)
|
|
else:
|
|
await query.message.reply_text(f"Failed to process action {action_id}: {msg}")
|
|
|
|
def move_action(self, action_id, target_status, user_id, username):
|
|
"""Moves action file and updates its status and history."""
|
|
source_path = self.pending_dir / f"{action_id}.json"
|
|
if not source_path.exists():
|
|
return False, "Action file no longer exists in pending."
|
|
|
|
target_dir = self.approved_dir if target_status == "approved" else self.rejected_dir
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
target_path = target_dir / f"{action_id}.json"
|
|
|
|
try:
|
|
data = json.loads(source_path.read_text())
|
|
current_status = data.get("status", "pending")
|
|
|
|
# Update data
|
|
data["status"] = target_status
|
|
data["updated_at"] = time.time()
|
|
|
|
history = data.get("transition_history", [])
|
|
history.append({
|
|
"from": current_status,
|
|
"to": target_status,
|
|
"timestamp": time.time(),
|
|
"by": f"tg:{username}"
|
|
})
|
|
data["transition_history"] = history
|
|
|
|
# Atomic move: write to new location, then delete old
|
|
target_path.write_text(json.dumps(data, indent=2))
|
|
source_path.unlink()
|
|
logger.info(f"Action {action_id} moved from {current_status} to {target_status} by {username}")
|
|
return True, "Success"
|
|
except Exception as e:
|
|
logger.error(f"Error moving action file: {e}")
|
|
return False, str(e)
|
|
|
|
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Simple start command to help users find their ID."""
|
|
user = update.effective_user
|
|
message = (
|
|
f"Hello {user.first_name}! 🤖\n"
|
|
f"Your Telegram User ID is: `{user.id}`\n\n"
|
|
)
|
|
if user.id in ALLOWED_IDS:
|
|
message += "✅ You are authorized to manage the homelab.\n\n"
|
|
message += "Use /help to see available commands."
|
|
else:
|
|
message += "❌ You are NOT authorized. Add your ID to `TELEGRAM_ALLOWED_USER_IDS`."
|
|
|
|
await update.message.reply_text(message, parse_mode="Markdown")
|
|
|
|
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
res = await fetch_api("/summary")
|
|
status = "✅ Online" if res else "❌ Unreachable"
|
|
message = (
|
|
f"🤖 *Telegram Bot Status*\n"
|
|
f"Control Plane API: {status}\n"
|
|
f"Target URL: `{CONTROL_PLANE_URL}`\n"
|
|
)
|
|
await update.message.reply_text(message, parse_mode="Markdown")
|
|
|
|
async def summary_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
data = await fetch_api("/summary")
|
|
if not data:
|
|
await update.message.reply_text("❌ Failed to fetch summary from Control Plane.")
|
|
return
|
|
|
|
msg = "📊 *System Summary*\n"
|
|
msg += f"Status: `{data.get('status', 'unknown')}`\n"
|
|
msg += f"Nodes: {data.get('node_count', 0)}\n"
|
|
msg += f"Services: {data.get('service_count', 0)}\n"
|
|
msg += f"Active Incidents: {data.get('active_incidents_count', 0)}\n"
|
|
if data.get('stale'):
|
|
msg += "\n⚠️ *Warning: Data is stale!*"
|
|
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
|
|
async def nodes_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
nodes = await fetch_api("/nodes")
|
|
if nodes is None:
|
|
await update.message.reply_text("❌ Failed to fetch nodes.")
|
|
return
|
|
|
|
if not nodes:
|
|
await update.message.reply_text("No nodes discovered in the fleet.")
|
|
return
|
|
|
|
msg = "🖥️ *Nodes Status*\n"
|
|
for node in nodes:
|
|
health_icon = "✅" if node.get('health') == 'nominal' else "⚠️" if node.get('health') == 'degraded' else "❌"
|
|
msg += f"{health_icon} *{node.get('hostname')}*: `{node.get('status', 'unknown')}`\n"
|
|
msg += f" Last seen: {node.get('last_seen', 'N/A')}\n"
|
|
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
|
|
async def services_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
services = await fetch_api("/services")
|
|
if services is None:
|
|
await update.message.reply_text("❌ Failed to fetch services.")
|
|
return
|
|
|
|
# Summarize by node
|
|
nodes = {}
|
|
for s in services:
|
|
node = s.get("node", "unknown")
|
|
if node not in nodes: nodes[node] = []
|
|
nodes[node].append(s)
|
|
|
|
msg = "⚙️ *Services Summary*\n"
|
|
if not nodes:
|
|
msg += "No services discovered."
|
|
else:
|
|
for node, svc_list in sorted(nodes.items()):
|
|
nominal = len([s for s in svc_list if s.get("health") == "nominal"])
|
|
msg += f"• *{node}*: {nominal}/{len(svc_list)} nominal\n"
|
|
|
|
msg += "\nUse /unhealthy to see issues."
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
|
|
async def unhealthy_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
services = await fetch_api("/services")
|
|
nodes = await fetch_api("/nodes")
|
|
|
|
msg = "⚠️ *Unhealthy Components*\n"
|
|
found = False
|
|
|
|
if services:
|
|
for s in services:
|
|
health = s.get("health", "").lower()
|
|
if health != "nominal":
|
|
msg += f"• Service *{s.get('name')}* on *{s.get('node')}*: `{health}`\n"
|
|
found = True
|
|
|
|
if nodes:
|
|
for n in nodes:
|
|
checks = n.get("checks", {})
|
|
if isinstance(checks, str):
|
|
try: checks = json.loads(checks)
|
|
except: checks = {}
|
|
|
|
docker = checks.get("docker", {})
|
|
if docker.get("status") == "ok":
|
|
for c in docker.get("containers", []):
|
|
if c.get("state") != "running":
|
|
msg += f"• Container *{c.get('name')}* on *{n.get('hostname')}*: `{c.get('state')}`\n"
|
|
found = True
|
|
|
|
if not found:
|
|
msg += "All systems nominal. ✅"
|
|
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
|
|
async def incidents_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
incidents = await fetch_api("/incidents")
|
|
if incidents is None:
|
|
await update.message.reply_text("❌ Failed to fetch incidents.")
|
|
return
|
|
|
|
active = [i for i in incidents if i.get("status") not in ("resolved", "closed")]
|
|
if not active:
|
|
await update.message.reply_text("No active incidents. ✅")
|
|
return
|
|
|
|
msg = "🚨 *Active Incidents*\n"
|
|
for inc in active:
|
|
severity = inc.get('severity', 'info').upper()
|
|
msg += f"• [{severity}] *{inc.get('type')}*: {inc.get('message')}\n"
|
|
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
|
|
async def actions_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
actions = await fetch_api("/actions")
|
|
if actions is None:
|
|
await update.message.reply_text("❌ Actions endpoint unavailable.")
|
|
return
|
|
|
|
msg = "⚡ *Actions Summary*\n"
|
|
total = 0
|
|
for status, act_list in actions.items():
|
|
if act_list:
|
|
msg += f"• {status.capitalize()}: {len(act_list)}\n"
|
|
total += len(act_list)
|
|
|
|
if total == 0:
|
|
msg = "No actions recorded."
|
|
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
|
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
msg = (
|
|
"📖 *Supported Commands*\n\n"
|
|
"/status - Check bot and API connectivity\n"
|
|
"/summary - System health overview\n"
|
|
"/nodes - List homelab nodes and their status\n"
|
|
"/services - Summary of services across nodes\n"
|
|
"/unhealthy - List all unhealthy components\n"
|
|
"/incidents - View active incidents\n"
|
|
"/actions - Summary of operator actions\n"
|
|
"/help - Show this help message\n\n"
|
|
"Free text will be handled by the guidance system."
|
|
)
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
|
|
async def handle_fallback(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handles non-command messages."""
|
|
if update.effective_user.id not in ALLOWED_IDS: return
|
|
|
|
if ENABLE_LLM_FALLBACK and OPENCLAW_BASE_URL:
|
|
# Placeholder for OpenClaw LLM fallback
|
|
# In a real scenario, this would call the LLM API
|
|
logger.info(f"LLM fallback requested for: {update.message.text}")
|
|
|
|
await update.message.reply_text(
|
|
"Use /summary, /nodes, /services, /unhealthy, /incidents, /actions."
|
|
)
|
|
|
|
async def run_bot():
|
|
if not TOKEN:
|
|
print("CRITICAL: TELEGRAM_BOT_TOKEN is not set. Telegram bot will not start.")
|
|
# Keep process alive to not crash compose if not desired, but here we just exit
|
|
# Requirement says: "do not fail if Telegram token is absent, but telegram-bot should be disabled or exit cleanly"
|
|
return
|
|
|
|
bot_logic = ApprovalBot()
|
|
|
|
application = ApplicationBuilder().token(TOKEN).build()
|
|
|
|
application.add_handler(CommandHandler("start", start_command))
|
|
application.add_handler(CommandHandler("status", status_command))
|
|
application.add_handler(CommandHandler("summary", summary_command))
|
|
application.add_handler(CommandHandler("nodes", nodes_command))
|
|
application.add_handler(CommandHandler("services", services_command))
|
|
application.add_handler(CommandHandler("unhealthy", unhealthy_command))
|
|
application.add_handler(CommandHandler("incidents", incidents_command))
|
|
application.add_handler(CommandHandler("actions", actions_command))
|
|
application.add_handler(CommandHandler("help", help_command))
|
|
|
|
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_fallback))
|
|
application.add_handler(CallbackQueryHandler(bot_logic.handle_callback))
|
|
|
|
# Schedule the pending actions check
|
|
job_queue = application.job_queue
|
|
if job_queue:
|
|
job_queue.run_repeating(bot_logic.check_pending_actions, interval=10, first=5)
|
|
else:
|
|
logger.warning("JobQueue is not available. Periodic pending actions check will be skipped.")
|
|
|
|
logger.info("Starting Telegram Approval Bot...")
|
|
await application.initialize()
|
|
await application.start()
|
|
await application.updater.start_polling()
|
|
|
|
# Run until the application is stopped
|
|
stop_event = asyncio.Event()
|
|
try:
|
|
await stop_event.wait()
|
|
except (KeyboardInterrupt, SystemExit):
|
|
logger.info("Stopping bot...")
|
|
finally:
|
|
await application.stop()
|
|
await application.shutdown()
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(run_bot())
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"Fatal error: {e}")
|