diff --git a/gotelegram-bot/bot.py b/gotelegram-bot/bot.py index aaf4ac5..ae2a9ad 100644 --- a/gotelegram-bot/bot.py +++ b/gotelegram-bot/bot.py @@ -1,1397 +1,1402 @@ -#!/usr/bin/env python3 -""" -GoTelegram v2.2 Bot - MTProxy Management for Linux -Manages telemt engine via Telegram interface with full CLI feature parity -Uses python-telegram-bot v21+ -""" - -import asyncio -import html -import json -import logging -import os -import re -import subprocess -import toml -from datetime import datetime -from pathlib import Path -from typing import Tuple, Optional, List, Dict, Any - -from dotenv import load_dotenv -from telegram import ( - Update, - InlineKeyboardButton, - InlineKeyboardMarkup, - InputFile, -) -from telegram.ext import ( - Application, - CommandHandler, - CallbackQueryHandler, - ContextTypes, - filters, -) -from telegram.error import TelegramError, BadRequest - -# Load environment variables -load_dotenv() - -# Logging configuration -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - level=logging.INFO, -) -logger = logging.getLogger(__name__) - -# ============================================================================ -# CONFIGURATION -# ============================================================================ - -GOTELEGRAM_VERSION = "2.2.0" -GOTELEGRAM_CONFIG = "/opt/gotelegram/config.json" -TELEMT_CONFIG = "/etc/telemt/config.toml" -TELEMT_SERVICE = "telemt" -WEBSITE_ROOT = "/var/www/gotelegram-site" -BACKUP_DIR = "/opt/gotelegram/backups" -TEMPLATES_CATALOG = "/opt/gotelegram/templates_catalog.json" - -PROMO_LINK = "https://vk.cc/ct29NQ" -TIP_LINK = "https://pay.cloudtips.ru/p/7410814f" - -BOT_TOKEN = os.getenv("BOT_TOKEN") -ALLOWED_IDS_STR = os.getenv("ALLOWED_IDS", "") -ALLOWED_IDS: set = set() -for _id_str in ALLOWED_IDS_STR.split(","): - _id_str = _id_str.strip() - if _id_str: - try: - ALLOWED_IDS.add(int(_id_str)) - except ValueError: - logging.warning(f"Invalid ALLOWED_IDS entry: {_id_str}") - -QUICK_DOMAINS = [ - "google.com", - "microsoft.com", - "cloudflare.com", - "apple.com", - "amazon.com", - "github.com", - "stackoverflow.com", - "medium.com", - "wikipedia.org", - "coursera.org", - "udemy.com", - "habr.com", - "stepik.org", - "duolingo.com", - "khanacademy.org", - "bbc.com", - "reuters.com", - "nytimes.com", - "ted.com", - "zoom.us", -] - -# ============================================================================ -# UTILITY FUNCTIONS -# ============================================================================ - - -async def sh(*args, timeout: int = 60) -> Tuple[int, str, str]: - """Execute shell command asynchronously. - - Args: - *args: Command and arguments - timeout: Timeout in seconds - - Returns: - Tuple of (return_code, stdout, stderr) - """ - try: - process = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await asyncio.wait_for( - process.communicate(), timeout=timeout - ) - return ( - process.returncode, - stdout.decode("utf-8", errors="replace"), - stderr.decode("utf-8", errors="replace"), - ) - except asyncio.TimeoutError: - try: - process.kill() - await process.wait() - except Exception: - pass - return (-1, "", f"Command timeout after {timeout}s") - except Exception as e: - return (-1, "", str(e)) - - -def load_json(path: str) -> Optional[Dict]: - """Load JSON file.""" - try: - with open(path, "r") as f: - return json.load(f) - except Exception as e: - logger.warning(f"Failed to load {path}: {e}") - return None - - -def load_toml(path: str) -> Optional[Dict]: - """Load TOML file.""" - try: - with open(path, "r") as f: - return toml.load(f) - except Exception as e: - logger.warning(f"Failed to load {path}: {e}") - return None - - -def save_json(path: str, data: Dict) -> bool: - """Save JSON file.""" - try: - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w") as f: - json.dump(data, f, indent=2) - return True - except Exception as e: - logger.error(f"Failed to save {path}: {e}") - return False - - -async def safe_edit_message(query, text: str, reply_markup=None, parse_mode=None) -> bool: - """Safely edit message, handling cases where message was deleted or not modified.""" - try: - await query.edit_message_text( - text, reply_markup=reply_markup, parse_mode=parse_mode - ) - return True - except BadRequest as e: - err_msg = str(e).lower() - if "message is not modified" in err_msg: - return True # No change needed, not an error - if "message to edit not found" in err_msg or "message can't be edited" in err_msg: - logger.warning(f"Cannot edit message: {e}") - return False - raise # Re-raise unexpected BadRequest - - -async def check_service_status(service: str) -> bool: - """Check if systemd service is running.""" - code, _, _ = await sh("systemctl", "is-active", service) - return code == 0 - - -async def get_telemt_version() -> str: - """Get telemt version.""" - code, stdout, _ = await sh("telemt", "-v") - if code == 0: - return stdout.strip().split()[-1] if stdout else "unknown" - return "unknown" - - -def is_docker_running() -> bool: - """Check if Docker daemon is running.""" - try: - subprocess.run( - ["docker", "ps"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - timeout=5, - ) - return True - except Exception: - return False - - -async def check_old_container() -> Optional[str]: - """Check for old mtg Docker container (v1 migration).""" - if not is_docker_running(): - return None - code, stdout, _ = await sh("docker", "ps", "-a", "--format", "{{.Names}}") - if code == 0 and "mtg" in stdout: - return "mtg" - return None - - -# ============================================================================ -# ACCESS CONTROL -# ============================================================================ - - -def is_user_allowed(user_id: int) -> bool: - """Check if user ID is in ALLOWED_IDS.""" - if not ALLOWED_IDS: - return True - return user_id in ALLOWED_IDS - - -async def require_auth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: - """Check authorization and send error if not allowed.""" - if not is_user_allowed(update.effective_user.id): - await update.message.reply_text( - f"Access denied. Your ID: {update.effective_user.id}" - ) - logger.warning( - f"Unauthorized access attempt from user {update.effective_user.id}" - ) - return False - return True - - -# ============================================================================ -# MAIN MENU -# ============================================================================ - - -def get_main_menu() -> InlineKeyboardMarkup: - """Generate main menu keyboard.""" - buttons = [ - [ - InlineKeyboardButton("βοΈ Install", callback_data="menu_install"), - InlineKeyboardButton("π Status", callback_data="menu_status"), - ], - [ - InlineKeyboardButton("π Link", callback_data="menu_link"), - InlineKeyboardButton("π€ Share", callback_data="menu_share"), - ], - [ - InlineKeyboardButton("π Restart", callback_data="menu_restart"), - InlineKeyboardButton("π Logs", callback_data="menu_logs"), - ], - [ - InlineKeyboardButton("β‘ Change Mode/Template", callback_data="menu_change"), - InlineKeyboardButton("πΎ Backup", callback_data="menu_backup"), - ], - [ - InlineKeyboardButton("β©οΈ Restore", callback_data="menu_restore"), - InlineKeyboardButton("π‘ Update telemt", callback_data="menu_update"), - ], - [ - InlineKeyboardButton("π Website/SSL", callback_data="menu_website"), - InlineKeyboardButton("π Promo", callback_data="menu_promo"), - ], - [ - InlineKeyboardButton("ποΈ Remove", callback_data="menu_remove"), - InlineKeyboardButton("βΉοΈ Credits", callback_data="menu_credits"), - ], - [InlineKeyboardButton("β Close", callback_data="close_menu")], - ] - return InlineKeyboardMarkup(buttons) - - -# ============================================================================ -# COMMANDS -# ============================================================================ - - -async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Start command - show main menu.""" - if not await require_auth(update, context): - return - - welcome = ( - f"GoTelegram v{GOTELEGRAM_VERSION}\n\n" - "π€ MTProxy Management Bot\n" - "Powered by telemt engine\n\n" - "Select an action from the menu below:" - ) - await update.message.reply_text( - welcome, reply_markup=get_main_menu(), parse_mode="HTML" - ) - - -async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Help command - show available commands.""" - if not await require_auth(update, context): - return - - help_text = ( - "GoTelegram Bot Commands\n\n" - "/start - Show main menu\n" - "/help - Show this help message\n" - "/status - Quick status check\n" - "/logs - Show recent logs\n\n" - "Use the inline menu for all other operations." - ) - await update.message.reply_text(help_text, parse_mode="HTML") - - -async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Quick status check.""" - if not await require_auth(update, context): - return - - await update.message.reply_text("β³ Checking status...", parse_mode="HTML") - status_text = await get_status_text() - await update.message.reply_text(status_text, parse_mode="HTML") - - -async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Show recent logs.""" - if not await require_auth(update, context): - return - - code, stdout, stderr = await sh( - "journalctl", "-u", TELEMT_SERVICE, "-n", "20", "--no-pager" - ) - if code == 0: - log_text = stdout[-1500:] if len(stdout) > 1500 else stdout - await update.message.reply_text( - f"
{html.escape(log_text)}",
- parse_mode="HTML",
- )
- else:
- await update.message.reply_text("Failed to retrieve logs")
-
-
-# ============================================================================
-# STATUS
-# ============================================================================
-
-
-async def get_status_text() -> str:
- """Generate status report."""
- lines = ["π Current Status\n"]
-
- # Service status
- is_running = await check_service_status(TELEMT_SERVICE)
- lines.append(f"Service: {'β
Running' if is_running else 'β Stopped'}")
-
- # Telemt version
- version = await get_telemt_version()
- lines.append(f"Telemt: v{version}")
-
- # Config status
- config = load_json(GOTELEGRAM_CONFIG)
- if config:
- lines.append(f"Mode: {html.escape(str(config.get('mode', 'unknown')))}")
- if "template" in config:
- lines.append(f"Template: {html.escape(str(config['template']))}")
- if "domain" in config:
- lines.append(f"Domain: {html.escape(str(config['domain']))}")
- if "port" in config:
- lines.append(f"Port: {html.escape(str(config['port']))}")
-
- # Telemt config
- telemt_cfg = load_toml(TELEMT_CONFIG)
- if telemt_cfg:
- cfg = telemt_cfg.get("config", {})
- if "listen_port" in cfg:
- lines.append(f"Listen Port: {cfg['listen_port']}")
-
- # Backups
- backup_count = 0
- try:
- if os.path.exists(BACKUP_DIR):
- backup_count = len([f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")])
- except Exception:
- pass
- lines.append(f"Backups: {backup_count}")
-
- # Old container check
- old_container = await check_old_container()
- if old_container:
- lines.append(f"\nβ οΈ Found old container: {html.escape(old_container)}")
- lines.append("Run 'Install' to migrate")
-
- return "\n".join(lines)
-
-
-async def cb_menu_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Status callback."""
- query = update.callback_query
- await query.answer()
-
- await safe_edit_message(query,"β³ Checking status...")
-
- status_text = await get_status_text()
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,
- status_text, reply_markup=keyboard, parse_mode="HTML"
- )
-
-
-# ============================================================================
-# INSTALL
-# ============================================================================
-
-
-def get_install_mode_menu() -> InlineKeyboardMarkup:
- """Install mode selection menu."""
- buttons = [
- [InlineKeyboardButton("β‘ Quick Mode", callback_data="install_mode_quick")],
- [InlineKeyboardButton("π Stealth Mode", callback_data="install_mode_stealth")],
- [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
- ]
- return InlineKeyboardMarkup(buttons)
-
-
-async def cb_menu_install(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Install menu callback."""
- query = update.callback_query
- await query.answer()
-
- # Check for old container
- old_container = await check_old_container()
- if old_container:
- text = (
- f"β οΈ Migration from v1 detected\n\n"
- f"Found Docker container: {html.escape(old_container)}\n\n"
- f"Would you like to:\n"
- f"1. Migrate from v1 (recommended)\n"
- f"2. Fresh install (will remove old container)\n\n"
- f"Select below or choose install mode"
- )
- buttons = [
- [InlineKeyboardButton("π Migrate from v1", callback_data="install_migrate")],
- [InlineKeyboardButton("β¨ Fresh Install", callback_data="install_mode_quick")],
- [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
- ]
- keyboard = InlineKeyboardMarkup(buttons)
- else:
- text = "Select installation mode:"
- keyboard = get_install_mode_menu()
-
- await safe_edit_message(query,
- text, reply_markup=keyboard, parse_mode="HTML"
- )
-
-
-async def cb_install_mode_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Quick mode domain selection."""
- query = update.callback_query
- await query.answer()
-
- # Show domains with pagination (4 per row, 2 rows)
- buttons = []
- for i in range(0, len(QUICK_DOMAINS), 2):
- row = []
- for j in range(2):
- if i + j < len(QUICK_DOMAINS):
- domain = QUICK_DOMAINS[i + j]
- row.append(
- InlineKeyboardButton(
- domain, callback_data=f"quick_dom_{i+j}"
- )
- )
- buttons.append(row)
-
- buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_install")])
-
- text = "Select a domain for quick mode:"
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,text, reply_markup=keyboard)
-
-
-async def cb_quick_domain(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Quick domain selection callback."""
- query = update.callback_query
- data = query.data
- try:
- domain_idx = int(data.split("_")[-1])
- domain = QUICK_DOMAINS[domain_idx]
- except (ValueError, IndexError):
- await query.answer("Invalid domain selection")
- return
-
- await query.answer()
- await safe_edit_message(query,f"β³ Installing with domain: {domain}...")
-
- # Simulate installation (in real scenario, call install script)
- config = {
- "mode": "quick",
- "domain": domain,
- "port": 443,
- "installed_at": datetime.now().isoformat(),
- }
-
- if save_json(GOTELEGRAM_CONFIG, config):
- text = (
- f"β
Quick mode installed!\n\n"
- f"Domain: {domain}\n"
- f"Mode: Quick\n\n"
- f"Service starting... Check status in 10 seconds."
- )
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,
- text, reply_markup=keyboard, parse_mode="HTML"
- )
- else:
- await safe_edit_message(query,
- "β Failed to save configuration",
- reply_markup=InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_install")]]
- ),
- )
-
-
-async def cb_install_mode_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Stealth mode - show template categories."""
- query = update.callback_query
- await query.answer()
-
- catalog = load_json(TEMPLATES_CATALOG)
- if not catalog or "categories" not in catalog:
- await safe_edit_message(query,
- "β Template catalog not found",
- reply_markup=InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_install")]]
- ),
- )
- return
-
- buttons = []
- for cat in catalog.get("categories", []):
- buttons.append(
- [
- InlineKeyboardButton(
- f"π {cat['name']}", callback_data=f"stealth_cat_{cat['id']}"
- )
- ]
- )
- buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_install")])
-
- text = "Stealth Mode - Select Template Category:"
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,text, reply_markup=keyboard)
-
-
-async def cb_stealth_category(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Show templates in category."""
- query = update.callback_query
- data = query.data
- cat_id = data.removeprefix("stealth_cat_")
-
- await query.answer()
-
- catalog = load_json(TEMPLATES_CATALOG)
- if not catalog:
- await safe_edit_message(query,"β Template catalog not found")
- return
-
- # Find category and templates
- category = None
- templates = []
- for cat in catalog.get("categories", []):
- if cat["id"] == cat_id:
- category = cat
- templates = cat.get("templates", [])
- break
-
- if not category:
- await safe_edit_message(query,"β Category not found")
- return
-
- buttons = []
- for tpl in templates:
- buttons.append(
- [
- InlineKeyboardButton(
- f"π¨ {tpl['name']}", callback_data=f"stealth_tpl_{tpl['id']}"
- )
- ]
- )
- buttons.append([InlineKeyboardButton("Β« Back", callback_data="install_mode_stealth")])
-
- text = f"Select template from {html.escape(category['name'])}:"
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_stealth_template(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Show template preview and confirm."""
- query = update.callback_query
- data = query.data
- tpl_id = data.removeprefix("stealth_tpl_")
-
- await query.answer()
-
- catalog = load_json(TEMPLATES_CATALOG)
- if not catalog:
- await safe_edit_message(query,"β Template catalog not found")
- return
-
- # Find template
- template = None
- for cat in catalog.get("categories", []):
- for tpl in cat.get("templates", []):
- if tpl["id"] == tpl_id:
- template = tpl
- break
- if template:
- break
-
- if not template:
- await safe_edit_message(query,"β Template not found")
- return
-
- tpl_name = html.escape(template.get('name', 'Unknown'))
- tpl_desc = html.escape(template.get('description', 'N/A'))
- text = (
- f"π¨ Template Preview\n\n"
- f"Name: {tpl_name}\n"
- f"Description: {tpl_desc}\n\n"
- )
- if "preview_url" in template:
- preview_url = html.escape(template['preview_url'], quote=True)
- text += f'View Live Preview\n\n'
-
- text += "Confirm installation?"
-
- buttons = [
- [
- InlineKeyboardButton(
- "β
Install", callback_data=f"stealth_confirm_{tpl_id}"
- )
- ],
- [InlineKeyboardButton("Β« Back", callback_data="install_mode_stealth")],
- ]
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_stealth_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Confirm and install stealth template."""
- query = update.callback_query
- data = query.data
- tpl_id = data.removeprefix("stealth_confirm_")
-
- await query.answer()
- await safe_edit_message(query,"β³ Installing template...")
-
- config = {
- "mode": "stealth",
- "template": tpl_id,
- "port": 443,
- "installed_at": datetime.now().isoformat(),
- }
-
- if save_json(GOTELEGRAM_CONFIG, config):
- text = (
- f"β
Stealth mode installed!\n\n"
- f"Template: {html.escape(tpl_id)}\n"
- f"Mode: Stealth\n\n"
- f"Service starting... Check status in 10 seconds."
- )
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,
- text, reply_markup=keyboard, parse_mode="HTML"
- )
- else:
- await safe_edit_message(query,
- "β Failed to save configuration",
- reply_markup=InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_install")]]
- ),
- )
-
-
-# ============================================================================
-# PROXY LINK & SHARE
-# ============================================================================
-
-
-async def get_proxy_link() -> Optional[str]:
- """Generate proxy link from config."""
- config = load_json(GOTELEGRAM_CONFIG)
- if not config:
- return None
-
- # Get secret from telemt TOML config
- secret = config.get("secret", "")
- if not secret:
- telemt_cfg = load_toml(TELEMT_CONFIG)
- if telemt_cfg:
- users = telemt_cfg.get("users", [])
- if isinstance(users, list) and users:
- secret = users[0].get("secret", "")
- if not secret:
- return None
-
- # Get server IP
- code, stdout, _ = await sh("curl", "-s", "-4", "--max-time", "5", "https://api.ipify.org")
- server = stdout.strip() if code == 0 and stdout.strip() else "0.0.0.0"
-
- port = config.get("port", 443)
-
- return f"tg://proxy?server={server}&port={port}&secret={secret}"
-
-
-async def cb_menu_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Generate and show proxy link."""
- query = update.callback_query
- await query.answer()
-
- link = await get_proxy_link()
- if not link:
- text = "β Proxy not installed yet. Run install first."
- else:
- text = (
- f"π Proxy Link\n\n"
- f"{html.escape(link)}\n\n"
- f"Open in Telegram to connect."
- )
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_menu_share(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Share link as QR code."""
- query = update.callback_query
- await query.answer()
-
- link = await get_proxy_link()
- if not link:
- text = "β Proxy not installed yet."
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard)
- return
-
- # Try to generate QR code
- qr_file = None
- code, _, _ = await sh("which", "qrencode")
- if code == 0:
- qr_path = "/tmp/proxy_qr.png"
- code, _, _ = await sh("qrencode", "-o", qr_path, link)
- if code == 0 and os.path.exists(qr_path):
- qr_file = qr_path
-
- if qr_file:
- try:
- with open(qr_file, "rb") as f:
- await query.message.reply_photo(
- photo=f,
- caption=f"π€ Proxy QR Code\n\n{html.escape(link)}",
- parse_mode="HTML",
- )
- except Exception as e:
- logger.error(f"Failed to send QR code: {e}")
- await safe_edit_message(query,
- f"π Proxy Link\n\n{html.escape(link)}",
- parse_mode="HTML",
- )
- else:
- await safe_edit_message(query,
- f"π Proxy Link\n\n{html.escape(link)}",
- reply_markup=InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- ),
- parse_mode="HTML",
- )
-
-
-# ============================================================================
-# RESTART & LOGS
-# ============================================================================
-
-
-async def cb_menu_restart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Restart service."""
- query = update.callback_query
- await query.answer()
-
- text = "β³ Restarting telemt service..."
- await safe_edit_message(query,text)
-
- code, _, stderr = await sh("systemctl", "restart", TELEMT_SERVICE)
- if code == 0:
- text = "β
Service restarted successfully"
- else:
- text = f"β Failed to restart:\n{html.escape(stderr[:500])}"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,
- text, reply_markup=keyboard, parse_mode="HTML"
- )
-
-
-async def cb_menu_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Show recent logs."""
- query = update.callback_query
- await query.answer()
-
- code, stdout, _ = await sh(
- "journalctl", "-u", TELEMT_SERVICE, "-n", "30", "--no-pager"
- )
-
- if code == 0:
- log_text = stdout[-1000:] if len(stdout) > 1000 else stdout
- text = f"π Recent Logs\n\n{html.escape(log_text)}"
- else:
- text = "β Failed to retrieve logs"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-# ============================================================================
-# BACKUP & RESTORE
-# ============================================================================
-
-
-async def cb_menu_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Backup menu."""
- query = update.callback_query
- await query.answer()
-
- # List existing backups
- backups = []
- try:
- if os.path.exists(BACKUP_DIR):
- backups = sorted(
- [f for f in os.listdir(BACKUP_DIR)
- if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
- reverse=True,
- )
- except Exception:
- pass
-
- buttons = [[InlineKeyboardButton("πΎ Create Backup", callback_data="backup_create")]]
-
- if backups:
- buttons.append(
- [InlineKeyboardButton("π List Backups", callback_data="backup_list")]
- )
-
- buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_main")])
-
- text = f"πΎ Backup Management\n\nExisting backups: {len(backups)}"
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_backup_create(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Create backup."""
- query = update.callback_query
- await query.answer()
-
- await safe_edit_message(query,"β³ Creating backup...")
-
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- backup_file = os.path.join(BACKUP_DIR, f"backup_{timestamp}.tar.gz")
-
- os.makedirs(BACKUP_DIR, exist_ok=True)
- code, _, stderr = await sh(
- "tar", "-czf", backup_file, GOTELEGRAM_CONFIG, TELEMT_CONFIG
- )
-
- if code == 0:
- text = f"β
Backup created:\n{html.escape(backup_file)}"
- else:
- text = f"β Backup failed:\n{html.escape(stderr[:500])}"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_backup")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_backup_list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """List backups."""
- query = update.callback_query
- await query.answer()
-
- backups = []
- try:
- if os.path.exists(BACKUP_DIR):
- backups = sorted(
- [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
- reverse=True,
- )
- except Exception:
- pass
-
- if not backups:
- text = "No backups found"
- else:
- text = "π Available Backups\n\n"
- for backup in backups[:10]:
- path = os.path.join(BACKUP_DIR, backup)
- size = os.path.getsize(path) / (1024 * 1024)
- text += f"{html.escape(backup)} ({size:.2f} MB)\n"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_backup")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_menu_restore(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Restore menu."""
- query = update.callback_query
- await query.answer()
-
- backups = []
- try:
- if os.path.exists(BACKUP_DIR):
- backups = sorted(
- [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
- reverse=True,
- )
- except Exception:
- pass
-
- if not backups:
- text = "β No backups available"
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- else:
- text = "Select backup to restore:"
- buttons = []
- for i, backup in enumerate(backups[:10]):
- buttons.append(
- [
- InlineKeyboardButton(
- backup, callback_data=f"restore_idx_{i}"
- )
- ]
- )
- buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_main")])
- keyboard = InlineKeyboardMarkup(buttons)
- # Store backup list in user_data for retrieval
- context.user_data["backup_list"] = backups[:10]
-
- await safe_edit_message(query,text, reply_markup=keyboard)
-
-
-async def cb_restore_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Execute backup restoration."""
- query = update.callback_query
- data = query.data
-
- try:
- idx = int(data.removeprefix("restore_idx_"))
- except ValueError:
- await query.answer("Invalid backup selection")
- return
-
- backup_list = context.user_data.get("backup_list", [])
- if idx < 0 or idx >= len(backup_list):
- await query.answer("Backup not found")
- return
-
- backup_name = backup_list[idx]
- backup_path = os.path.join(BACKUP_DIR, backup_name)
-
- await query.answer()
- await safe_edit_message(query,f"β³ Restoring from {html.escape(backup_name)}...")
-
- if not os.path.exists(backup_path):
- text = "β Backup file not found"
- else:
- # Simple restore: extract tar to overwrite configs
- code, _, stderr = await sh(
- "tar", "-xzf", backup_path, "-C", "/", timeout=60
- )
- if code == 0:
- # Restart services
- await sh("systemctl", "restart", TELEMT_SERVICE)
- text = f"β
Restored from {html.escape(backup_name)}"
- else:
- text = f"β Restore failed:\n{html.escape(stderr[:500])}"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-# ============================================================================
-# UPDATE & MODE/TEMPLATE CHANGE
-# ============================================================================
-
-
-async def cb_menu_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Update telemt by re-running the install script's update logic."""
- query = update.callback_query
- await query.answer()
-
- await safe_edit_message(query,"β³ Checking for telemt updates...")
-
- # Get current version
- cur_code, cur_out, _ = await sh("telemt", "--version")
- current = cur_out.strip() if cur_code == 0 else "unknown"
-
- # Check latest release from GitHub
- code, stdout, stderr = await sh(
- "curl", "-s", "--max-time", "10",
- "https://api.github.com/repos/telemt/telemt/releases/latest",
- )
-
- if code != 0 or not stdout.strip():
- text = "β Failed to check for updates"
- else:
- try:
- release = json.loads(stdout)
- latest = release.get("tag_name", "unknown")
- if latest == current:
- text = f"β
telemt is already up to date ({html.escape(current)})"
- else:
- text = (
- f"βΉοΈ Update available: {html.escape(current)} β {html.escape(latest)}\n\n"
- f"Run the CLI installer to update:\n"
- f"sudo bash install.sh β menu item 10"
- )
- except json.JSONDecodeError:
- text = "β Failed to parse release info"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_menu_change(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Change mode or template."""
- query = update.callback_query
- await query.answer()
-
- buttons = [
- [InlineKeyboardButton("β‘ Switch to Quick Mode", callback_data="change_quick")],
- [InlineKeyboardButton("π Switch to Stealth Mode", callback_data="change_stealth")],
- [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
- ]
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,
- "Change mode or template:", reply_markup=keyboard
- )
-
-
-async def cb_change_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Switch to quick mode β show domain selection."""
- query = update.callback_query
- await query.answer()
- # Reuse the quick mode domain selection flow
- await cb_install_mode_quick(update, context)
-
-
-async def cb_change_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Switch to stealth mode β show template categories."""
- query = update.callback_query
- await query.answer()
- # Reuse the stealth mode template selection flow
- await cb_install_mode_stealth(update, context)
-
-
-async def cb_install_migrate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Migrate from v1 (mtg Docker) to v2 (telemt)."""
- query = update.callback_query
- await query.answer()
-
- await safe_edit_message(query,"β³ Migrating from v1...")
-
- # Stop old mtg container
- code, _, stderr = await sh("docker", "stop", "mtproto-proxy", timeout=30)
- if code != 0:
- code, _, stderr = await sh("docker", "stop", "mtg", timeout=30)
-
- # Remove old container
- await sh("docker", "rm", "mtproto-proxy", timeout=15)
- await sh("docker", "rm", "mtg", timeout=15)
-
- text = (
- "β
v1 container stopped and removed\n\n"
- "Now select installation mode for v2:"
- )
- keyboard = get_install_mode_menu()
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-# ============================================================================
-# WEBSITE & SSL
-# ============================================================================
-
-
-async def cb_menu_website(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Website and SSL management."""
- query = update.callback_query
- await query.answer()
-
- buttons = [
- [InlineKeyboardButton("π Renew SSL Certificate", callback_data="ssl_renew")],
- [InlineKeyboardButton("π SSL Status", callback_data="ssl_status")],
- [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
- ]
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,
- "Website & SSL Management:", reply_markup=keyboard
- )
-
-
-async def cb_ssl_renew(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Renew SSL certificate."""
- query = update.callback_query
- await query.answer()
-
- await safe_edit_message(query,"β³ Renewing SSL certificate...")
-
- code, stdout, stderr = await sh("certbot", "renew", timeout=120)
-
- if code == 0:
- text = "β
SSL certificate renewed successfully"
- else:
- text = f"β Renewal failed:\n{html.escape(stderr[:500])}"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_website")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_ssl_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Show SSL status."""
- query = update.callback_query
- await query.answer()
-
- code, stdout, _ = await sh("certbot", "certificates")
-
- if code == 0:
- text = f"π SSL Certificates\n\n{html.escape(stdout[:1000])}"
- else:
- text = "β Failed to get SSL status"
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_website")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-# ============================================================================
-# PROMO & CREDITS
-# ============================================================================
-
-
-async def cb_menu_promo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Promo information."""
- query = update.callback_query
- await query.answer()
-
- text = (
- f"π GoTelegram Promo\n\n"
- f"Share the love! Invite friends to use GoTelegram.\n\n"
- f"Promo Link\n\n"
- f"Support development:\n"
- f"Send a Tip"
- )
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_menu_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Credits and acknowledgements."""
- query = update.callback_query
- await query.answer()
-
- text = (
- f"βΉοΈ Credits & Acknowledgements\n\n"
- f"GoTelegram v{GOTELEGRAM_VERSION}\n\n"
- f"Built with love for the Telegram community\n\n"
- f"Special thanks to:\n\n"
- f"π telemt - MTProxy engine\n"
- f" High-performance proxy core\n\n"
- f"π¨ HTML5UP - Beautiful web templates\n"
- f" Responsive design & themes\n\n"
- f"π Learning Zone - Educational resources\n"
- f" Community learning support\n\n"
- f"π Start Bootstrap - Bootstrap templates\n"
- f" Professional design framework\n\n"
- f"π¬ Community - Your feedback & support\n\n"
- f"GoTelegram is open-source and community-driven"
- )
-
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-# ============================================================================
-# REMOVE
-# ============================================================================
-
-
-async def cb_menu_remove(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Remove installation."""
- query = update.callback_query
- await query.answer()
-
- text = (
- "β οΈ Remove GoTelegram\n\n"
- "This will completely remove the installation.\n"
- "Are you sure?"
- )
-
- buttons = [
- [InlineKeyboardButton("β Yes, Remove", callback_data="remove_confirm")],
- [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
- ]
- keyboard = InlineKeyboardMarkup(buttons)
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-async def cb_remove_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Confirm removal."""
- query = update.callback_query
- await query.answer()
-
- await safe_edit_message(query,"β³ Removing GoTelegram...")
-
- # Stop service
- await sh("systemctl", "stop", TELEMT_SERVICE)
-
- # Remove directories
- for path in ["/opt/gotelegram", WEBSITE_ROOT]:
- await sh("rm", "-rf", path)
-
- text = "β
GoTelegram removed successfully"
- keyboard = InlineKeyboardMarkup(
- [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
- )
- await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
-
-
-# ============================================================================
-# CALLBACK ROUTING
-# ============================================================================
-
-
-async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Route all callbacks."""
- query = update.callback_query
- data = query.data
-
- # Access control
- if not is_user_allowed(update.effective_user.id):
- await query.answer("Access denied")
- return
-
- # Main menu
- if data == "menu_main":
- await query.answer()
- buttons = get_main_menu()
- text = (
- f"GoTelegram v{GOTELEGRAM_VERSION}\n\n"
- "π€ MTProxy Management\n"
- "Select an action:"
- )
- await safe_edit_message(query,text, reply_markup=buttons, parse_mode="HTML")
- return
-
- if data == "close_menu":
- await query.answer()
- await query.delete_message()
- return
-
- # Dispatch to handlers
- handlers = {
- "menu_install": cb_menu_install,
- "menu_status": cb_menu_status,
- "menu_link": cb_menu_link,
- "menu_share": cb_menu_share,
- "menu_restart": cb_menu_restart,
- "menu_logs": cb_menu_logs,
- "menu_backup": cb_menu_backup,
- "menu_restore": cb_menu_restore,
- "menu_update": cb_menu_update,
- "menu_change": cb_menu_change,
- "menu_website": cb_menu_website,
- "menu_promo": cb_menu_promo,
- "menu_credits": cb_menu_credits,
- "menu_remove": cb_menu_remove,
- "install_mode_quick": cb_install_mode_quick,
- "install_mode_stealth": cb_install_mode_stealth,
- "backup_create": cb_backup_create,
- "backup_list": cb_backup_list,
- "ssl_renew": cb_ssl_renew,
- "ssl_status": cb_ssl_status,
- "remove_confirm": cb_remove_confirm,
- "change_quick": cb_change_quick,
- "change_stealth": cb_change_stealth,
- "install_migrate": cb_install_migrate,
- }
-
- # Pattern-based handlers
- if data.startswith("quick_dom_"):
- await cb_quick_domain(update, context)
- elif data.startswith("stealth_cat_"):
- await cb_stealth_category(update, context)
- elif data.startswith("stealth_tpl_"):
- await cb_stealth_template(update, context)
- elif data.startswith("stealth_confirm_"):
- await cb_stealth_confirm(update, context)
- elif data.startswith("restore_idx_"):
- await cb_restore_backup(update, context)
- elif data in handlers:
- await handlers[data](update, context)
- else:
- await query.answer("Unknown action")
-
-
-# ============================================================================
-# ERROR HANDLERS
-# ============================================================================
-
-
-async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- """Log errors caused by Updates."""
- logger.error(f"Exception while handling an update:", exc_info=context.error)
-
-
-# ============================================================================
-# MAIN APPLICATION
-# ============================================================================
-
-
-def main() -> None:
- """Start the bot."""
- if not BOT_TOKEN:
- logger.error("BOT_TOKEN not set in .env file")
- return
-
- # Create the Application
- application = Application.builder().token(BOT_TOKEN).build()
-
- # Command handlers
- application.add_handler(CommandHandler("start", cmd_start))
- application.add_handler(CommandHandler("help", cmd_help))
- application.add_handler(CommandHandler("status", cmd_status))
- application.add_handler(CommandHandler("logs", cmd_logs))
-
- # Callback query handler (buttons)
- application.add_handler(CallbackQueryHandler(handle_callback))
-
- # Error handler
- application.add_error_handler(error_handler)
-
- # Run the bot
- logger.info(f"GoTelegram v{GOTELEGRAM_VERSION} bot starting...")
- application.run_polling(allowed_updates=Update.ALL_TYPES)
-
-
-if __name__ == "__main__":
- main()
+#!/usr/bin/env python3
+"""
+GoTelegram v2.2 Bot - MTProxy Management for Linux
+Manages telemt engine via Telegram interface with full CLI feature parity
+Uses python-telegram-bot v21+
+"""
+
+import asyncio
+import html
+import json
+import logging
+import os
+import re
+import subprocess
+import toml
+from datetime import datetime
+from pathlib import Path
+from typing import Tuple, Optional, List, Dict, Any
+
+from dotenv import load_dotenv
+from telegram import (
+ Update,
+ InlineKeyboardButton,
+ InlineKeyboardMarkup,
+ InputFile,
+)
+from telegram.ext import (
+ Application,
+ CommandHandler,
+ CallbackQueryHandler,
+ ContextTypes,
+ filters,
+)
+from telegram.error import TelegramError, BadRequest
+
+# Load environment variables
+load_dotenv()
+
+# Logging configuration
+logging.basicConfig(
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+ level=logging.INFO,
+)
+logger = logging.getLogger(__name__)
+
+# ============================================================================
+# CONFIGURATION
+# ============================================================================
+
+GOTELEGRAM_VERSION = "2.2.0"
+GOTELEGRAM_CONFIG = "/opt/gotelegram/config.json"
+TELEMT_CONFIG = "/etc/telemt/config.toml"
+TELEMT_SERVICE = "telemt"
+WEBSITE_ROOT = "/var/www/gotelegram-site"
+BACKUP_DIR = "/opt/gotelegram/backups"
+TEMPLATES_CATALOG = "/opt/gotelegram/templates_catalog.json"
+
+PROMO_LINK = "https://vk.cc/ct29NQ"
+TIP_LINK = "https://pay.cloudtips.ru/p/7410814f"
+
+BOT_TOKEN = os.getenv("BOT_TOKEN")
+ALLOWED_IDS_STR = os.getenv("ALLOWED_IDS", "")
+ALLOWED_IDS: set = set()
+for _id_str in ALLOWED_IDS_STR.split(","):
+ _id_str = _id_str.strip()
+ if _id_str:
+ try:
+ ALLOWED_IDS.add(int(_id_str))
+ except ValueError:
+ logging.warning(f"Invalid ALLOWED_IDS entry: {_id_str}")
+
+QUICK_DOMAINS = [
+ "google.com",
+ "microsoft.com",
+ "cloudflare.com",
+ "apple.com",
+ "amazon.com",
+ "github.com",
+ "stackoverflow.com",
+ "medium.com",
+ "wikipedia.org",
+ "coursera.org",
+ "udemy.com",
+ "habr.com",
+ "stepik.org",
+ "duolingo.com",
+ "khanacademy.org",
+ "bbc.com",
+ "reuters.com",
+ "nytimes.com",
+ "ted.com",
+ "zoom.us",
+]
+
+# ============================================================================
+# UTILITY FUNCTIONS
+# ============================================================================
+
+
+async def sh(*args, timeout: int = 60) -> Tuple[int, str, str]:
+ """Execute shell command asynchronously.
+
+ Args:
+ *args: Command and arguments
+ timeout: Timeout in seconds
+
+ Returns:
+ Tuple of (return_code, stdout, stderr)
+ """
+ try:
+ process = await asyncio.create_subprocess_exec(
+ *args,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ stdout, stderr = await asyncio.wait_for(
+ process.communicate(), timeout=timeout
+ )
+ return (
+ process.returncode,
+ stdout.decode("utf-8", errors="replace"),
+ stderr.decode("utf-8", errors="replace"),
+ )
+ except asyncio.TimeoutError:
+ try:
+ process.kill()
+ await process.wait()
+ except Exception:
+ pass
+ return (-1, "", f"Command timeout after {timeout}s")
+ except Exception as e:
+ return (-1, "", str(e))
+
+
+def load_json(path: str) -> Optional[Dict]:
+ """Load JSON file."""
+ try:
+ with open(path, "r") as f:
+ return json.load(f)
+ except Exception as e:
+ logger.warning(f"Failed to load {path}: {e}")
+ return None
+
+
+def load_toml(path: str) -> Optional[Dict]:
+ """Load TOML file."""
+ try:
+ with open(path, "r") as f:
+ return toml.load(f)
+ except Exception as e:
+ logger.warning(f"Failed to load {path}: {e}")
+ return None
+
+
+def save_json(path: str, data: Dict) -> bool:
+ """Save JSON file."""
+ try:
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with open(path, "w") as f:
+ json.dump(data, f, indent=2)
+ return True
+ except Exception as e:
+ logger.error(f"Failed to save {path}: {e}")
+ return False
+
+
+async def safe_edit_message(query, text: str, reply_markup=None, parse_mode=None) -> bool:
+ """Safely edit message, handling cases where message was deleted or not modified."""
+ try:
+ await query.edit_message_text(
+ text, reply_markup=reply_markup, parse_mode=parse_mode
+ )
+ return True
+ except BadRequest as e:
+ err_msg = str(e).lower()
+ if "message is not modified" in err_msg:
+ return True # No change needed, not an error
+ if "message to edit not found" in err_msg or "message can't be edited" in err_msg:
+ logger.warning(f"Cannot edit message: {e}")
+ return False
+ raise # Re-raise unexpected BadRequest
+
+
+async def check_service_status(service: str) -> bool:
+ """Check if systemd service is running."""
+ code, _, _ = await sh("systemctl", "is-active", service)
+ return code == 0
+
+
+async def get_telemt_version() -> str:
+ """Get telemt version."""
+ code, stdout, _ = await sh("telemt", "-v")
+ if code == 0:
+ return stdout.strip().split()[-1] if stdout else "unknown"
+ return "unknown"
+
+
+def is_docker_running() -> bool:
+ """Check if Docker daemon is running."""
+ try:
+ subprocess.run(
+ ["docker", "ps"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ timeout=5,
+ )
+ return True
+ except Exception:
+ return False
+
+
+async def check_old_container() -> Optional[str]:
+ """Check for old mtg Docker container (v1 migration)."""
+ if not is_docker_running():
+ return None
+ code, stdout, _ = await sh("docker", "ps", "-a", "--format", "{{.Names}}")
+ if code == 0 and "mtg" in stdout:
+ return "mtg"
+ return None
+
+
+# ============================================================================
+# ACCESS CONTROL
+# ============================================================================
+
+
+def is_user_allowed(user_id: int) -> bool:
+ """Check if user ID is in ALLOWED_IDS."""
+ if not ALLOWED_IDS:
+ return True
+ return user_id in ALLOWED_IDS
+
+
+async def require_auth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
+ """Check authorization and send error if not allowed."""
+ if not is_user_allowed(update.effective_user.id):
+ await update.message.reply_text(
+ f"Access denied. Your ID: {update.effective_user.id}"
+ )
+ logger.warning(
+ f"Unauthorized access attempt from user {update.effective_user.id}"
+ )
+ return False
+ return True
+
+
+# ============================================================================
+# MAIN MENU
+# ============================================================================
+
+
+def get_main_menu() -> InlineKeyboardMarkup:
+ """Generate main menu keyboard."""
+ buttons = [
+ [
+ InlineKeyboardButton("βοΈ Install", callback_data="menu_install"),
+ InlineKeyboardButton("π Status", callback_data="menu_status"),
+ ],
+ [
+ InlineKeyboardButton("π Link", callback_data="menu_link"),
+ InlineKeyboardButton("π€ Share", callback_data="menu_share"),
+ ],
+ [
+ InlineKeyboardButton("π Restart", callback_data="menu_restart"),
+ InlineKeyboardButton("π Logs", callback_data="menu_logs"),
+ ],
+ [
+ InlineKeyboardButton("β‘ Change Mode/Template", callback_data="menu_change"),
+ InlineKeyboardButton("πΎ Backup", callback_data="menu_backup"),
+ ],
+ [
+ InlineKeyboardButton("β©οΈ Restore", callback_data="menu_restore"),
+ InlineKeyboardButton("π‘ Update telemt", callback_data="menu_update"),
+ ],
+ [
+ InlineKeyboardButton("π Website/SSL", callback_data="menu_website"),
+ InlineKeyboardButton("π Promo", callback_data="menu_promo"),
+ ],
+ [
+ InlineKeyboardButton("ποΈ Remove", callback_data="menu_remove"),
+ InlineKeyboardButton("βΉοΈ Credits", callback_data="menu_credits"),
+ ],
+ [InlineKeyboardButton("β Close", callback_data="close_menu")],
+ ]
+ return InlineKeyboardMarkup(buttons)
+
+
+# ============================================================================
+# COMMANDS
+# ============================================================================
+
+
+async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Start command - show main menu."""
+ if not await require_auth(update, context):
+ return
+
+ welcome = (
+ f"GoTelegram v{GOTELEGRAM_VERSION}\n\n"
+ "π€ MTProxy Management Bot\n"
+ "Powered by telemt engine\n\n"
+ "Select an action from the menu below:"
+ )
+ await update.message.reply_text(
+ welcome, reply_markup=get_main_menu(), parse_mode="HTML"
+ )
+
+
+async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Help command - show available commands."""
+ if not await require_auth(update, context):
+ return
+
+ help_text = (
+ "GoTelegram Bot Commands\n\n"
+ "/start - Show main menu\n"
+ "/help - Show this help message\n"
+ "/status - Quick status check\n"
+ "/logs - Show recent logs\n\n"
+ "Use the inline menu for all other operations."
+ )
+ await update.message.reply_text(help_text, parse_mode="HTML")
+
+
+async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Quick status check."""
+ if not await require_auth(update, context):
+ return
+
+ await update.message.reply_text("β³ Checking status...", parse_mode="HTML")
+ status_text = await get_status_text()
+ await update.message.reply_text(status_text, parse_mode="HTML")
+
+
+async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Show recent logs."""
+ if not await require_auth(update, context):
+ return
+
+ code, stdout, stderr = await sh(
+ "journalctl", "-u", TELEMT_SERVICE, "-n", "20", "--no-pager"
+ )
+ if code == 0:
+ log_text = stdout[-1500:] if len(stdout) > 1500 else stdout
+ await update.message.reply_text(
+ f"{html.escape(log_text)}",
+ parse_mode="HTML",
+ )
+ else:
+ await update.message.reply_text("Failed to retrieve logs")
+
+
+# ============================================================================
+# STATUS
+# ============================================================================
+
+
+async def get_status_text() -> str:
+ """Generate status report."""
+ lines = ["π Current Status\n"]
+
+ # Service status
+ is_running = await check_service_status(TELEMT_SERVICE)
+ lines.append(f"Service: {'β
Running' if is_running else 'β Stopped'}")
+
+ # Telemt version
+ version = await get_telemt_version()
+ lines.append(f"Telemt: v{version}")
+
+ # Config status
+ config = load_json(GOTELEGRAM_CONFIG)
+ if config:
+ lines.append(f"Mode: {html.escape(str(config.get('mode', 'unknown')))}")
+ if "template" in config:
+ lines.append(f"Template: {html.escape(str(config['template']))}")
+ if "domain" in config:
+ lines.append(f"Domain: {html.escape(str(config['domain']))}")
+ if "port" in config:
+ lines.append(f"Port: {html.escape(str(config['port']))}")
+
+ # Telemt config
+ telemt_cfg = load_toml(TELEMT_CONFIG)
+ if telemt_cfg:
+ cfg = telemt_cfg.get("config", {})
+ if "listen_port" in cfg:
+ lines.append(f"Listen Port: {cfg['listen_port']}")
+
+ # Backups
+ backup_count = 0
+ try:
+ if os.path.exists(BACKUP_DIR):
+ backup_count = len([f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")])
+ except Exception:
+ pass
+ lines.append(f"Backups: {backup_count}")
+
+ # Old container check
+ old_container = await check_old_container()
+ if old_container:
+ lines.append(f"\nβ οΈ Found old container: {html.escape(old_container)}")
+ lines.append("Run 'Install' to migrate")
+
+ return "\n".join(lines)
+
+
+async def cb_menu_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Status callback."""
+ query = update.callback_query
+ await query.answer()
+
+ await safe_edit_message(query,"β³ Checking status...")
+
+ status_text = await get_status_text()
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,
+ status_text, reply_markup=keyboard, parse_mode="HTML"
+ )
+
+
+# ============================================================================
+# INSTALL
+# ============================================================================
+
+
+def get_install_mode_menu() -> InlineKeyboardMarkup:
+ """Install mode selection menu."""
+ buttons = [
+ [InlineKeyboardButton("β‘ Quick Mode", callback_data="install_mode_quick")],
+ [InlineKeyboardButton("π Stealth Mode", callback_data="install_mode_stealth")],
+ [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
+ ]
+ return InlineKeyboardMarkup(buttons)
+
+
+async def cb_menu_install(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Install menu callback."""
+ query = update.callback_query
+ await query.answer()
+
+ # Check for old container
+ old_container = await check_old_container()
+ if old_container:
+ text = (
+ f"β οΈ Migration from v1 detected\n\n"
+ f"Found Docker container: {html.escape(old_container)}\n\n"
+ f"Would you like to:\n"
+ f"1. Migrate from v1 (recommended)\n"
+ f"2. Fresh install (will remove old container)\n\n"
+ f"Select below or choose install mode"
+ )
+ buttons = [
+ [InlineKeyboardButton("π Migrate from v1", callback_data="install_migrate")],
+ [InlineKeyboardButton("β¨ Fresh Install", callback_data="install_mode_quick")],
+ [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
+ ]
+ keyboard = InlineKeyboardMarkup(buttons)
+ else:
+ text = "Select installation mode:"
+ keyboard = get_install_mode_menu()
+
+ await safe_edit_message(query,
+ text, reply_markup=keyboard, parse_mode="HTML"
+ )
+
+
+async def cb_install_mode_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Quick mode domain selection."""
+ query = update.callback_query
+ await query.answer()
+
+ # Show domains with pagination (4 per row, 2 rows)
+ buttons = []
+ for i in range(0, len(QUICK_DOMAINS), 2):
+ row = []
+ for j in range(2):
+ if i + j < len(QUICK_DOMAINS):
+ domain = QUICK_DOMAINS[i + j]
+ row.append(
+ InlineKeyboardButton(
+ domain, callback_data=f"quick_dom_{i+j}"
+ )
+ )
+ buttons.append(row)
+
+ buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_install")])
+
+ text = "Select a domain for quick mode:"
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,text, reply_markup=keyboard)
+
+
+async def cb_quick_domain(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Quick domain selection callback."""
+ query = update.callback_query
+ data = query.data
+ try:
+ domain_idx = int(data.split("_")[-1])
+ domain = QUICK_DOMAINS[domain_idx]
+ except (ValueError, IndexError):
+ await query.answer("Invalid domain selection")
+ return
+
+ await query.answer()
+ await safe_edit_message(query,f"β³ Installing with domain: {domain}...")
+
+ # Simulate installation (in real scenario, call install script)
+ config = {
+ "mode": "quick",
+ "domain": domain,
+ "port": 443,
+ "installed_at": datetime.now().isoformat(),
+ }
+
+ if save_json(GOTELEGRAM_CONFIG, config):
+ text = (
+ f"β
Quick mode installed!\n\n"
+ f"Domain: {domain}\n"
+ f"Mode: Quick\n\n"
+ f"Service starting... Check status in 10 seconds."
+ )
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,
+ text, reply_markup=keyboard, parse_mode="HTML"
+ )
+ else:
+ await safe_edit_message(query,
+ "β Failed to save configuration",
+ reply_markup=InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_install")]]
+ ),
+ )
+
+
+async def cb_install_mode_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Stealth mode - show template categories."""
+ query = update.callback_query
+ await query.answer()
+
+ catalog = load_json(TEMPLATES_CATALOG)
+ if not catalog or "categories" not in catalog:
+ await safe_edit_message(query,
+ "β Template catalog not found",
+ reply_markup=InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_install")]]
+ ),
+ )
+ return
+
+ buttons = []
+ for cat in catalog.get("categories", []):
+ buttons.append(
+ [
+ InlineKeyboardButton(
+ f"π {cat['name']}", callback_data=f"stealth_cat_{cat['id']}"
+ )
+ ]
+ )
+ buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_install")])
+
+ text = "Stealth Mode - Select Template Category:"
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,text, reply_markup=keyboard)
+
+
+async def cb_stealth_category(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Show templates in category."""
+ query = update.callback_query
+ data = query.data
+ cat_id = data.removeprefix("stealth_cat_")
+
+ await query.answer()
+
+ catalog = load_json(TEMPLATES_CATALOG)
+ if not catalog:
+ await safe_edit_message(query,"β Template catalog not found")
+ return
+
+ # Find category and templates
+ category = None
+ templates = []
+ for cat in catalog.get("categories", []):
+ if cat["id"] == cat_id:
+ category = cat
+ templates = cat.get("templates", [])
+ break
+
+ if not category:
+ await safe_edit_message(query,"β Category not found")
+ return
+
+ buttons = []
+ for tpl in templates:
+ buttons.append(
+ [
+ InlineKeyboardButton(
+ f"π¨ {tpl['name']}", callback_data=f"stealth_tpl_{tpl['id']}"
+ )
+ ]
+ )
+ buttons.append([InlineKeyboardButton("Β« Back", callback_data="install_mode_stealth")])
+
+ text = f"Select template from {html.escape(category['name'])}:"
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_stealth_template(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Show template preview and confirm."""
+ query = update.callback_query
+ data = query.data
+ tpl_id = data.removeprefix("stealth_tpl_")
+
+ await query.answer()
+
+ catalog = load_json(TEMPLATES_CATALOG)
+ if not catalog:
+ await safe_edit_message(query,"β Template catalog not found")
+ return
+
+ # Find template
+ template = None
+ for cat in catalog.get("categories", []):
+ for tpl in cat.get("templates", []):
+ if tpl["id"] == tpl_id:
+ template = tpl
+ break
+ if template:
+ break
+
+ if not template:
+ await safe_edit_message(query,"β Template not found")
+ return
+
+ tpl_name = html.escape(template.get('name', 'Unknown'))
+ tpl_desc = html.escape(template.get('description', 'N/A'))
+ text = (
+ f"π¨ Template Preview\n\n"
+ f"Name: {tpl_name}\n"
+ f"Description: {tpl_desc}\n\n"
+ )
+ if "preview_url" in template:
+ preview_url = html.escape(template['preview_url'], quote=True)
+ text += f'View Live Preview\n\n'
+
+ text += "Confirm installation?"
+
+ buttons = [
+ [
+ InlineKeyboardButton(
+ "β
Install", callback_data=f"stealth_confirm_{tpl_id}"
+ )
+ ],
+ [InlineKeyboardButton("Β« Back", callback_data="install_mode_stealth")],
+ ]
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_stealth_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Confirm and install stealth template."""
+ query = update.callback_query
+ data = query.data
+ tpl_id = data.removeprefix("stealth_confirm_")
+
+ await query.answer()
+ await safe_edit_message(query,"β³ Installing template...")
+
+ config = {
+ "mode": "stealth",
+ "template": tpl_id,
+ "port": 443,
+ "installed_at": datetime.now().isoformat(),
+ }
+
+ if save_json(GOTELEGRAM_CONFIG, config):
+ text = (
+ f"β
Stealth mode installed!\n\n"
+ f"Template: {html.escape(tpl_id)}\n"
+ f"Mode: Stealth\n\n"
+ f"Service starting... Check status in 10 seconds."
+ )
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,
+ text, reply_markup=keyboard, parse_mode="HTML"
+ )
+ else:
+ await safe_edit_message(query,
+ "β Failed to save configuration",
+ reply_markup=InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_install")]]
+ ),
+ )
+
+
+# ============================================================================
+# PROXY LINK & SHARE
+# ============================================================================
+
+
+async def get_proxy_link() -> Optional[str]:
+ """Generate proxy link from config."""
+ config = load_json(GOTELEGRAM_CONFIG)
+ if not config:
+ return None
+
+ # Get secret from telemt TOML config
+ secret = config.get("secret", "")
+ if not secret:
+ telemt_cfg = load_toml(TELEMT_CONFIG)
+ if telemt_cfg:
+ users = telemt_cfg.get("users", [])
+ if isinstance(users, list) and users:
+ secret = users[0].get("secret", "")
+ if not secret:
+ return None
+
+ # Get server IP
+ code, stdout, _ = await sh("curl", "-s", "-4", "--max-time", "5", "https://api.ipify.org")
+ server = stdout.strip() if code == 0 and stdout.strip() else "0.0.0.0"
+
+ port = config.get("port", 443)
+
+ return f"tg://proxy?server={server}&port={port}&secret={secret}"
+
+
+async def cb_menu_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Generate and show proxy link."""
+ query = update.callback_query
+ await query.answer()
+
+ link = await get_proxy_link()
+ if not link:
+ text = "β Proxy not installed yet. Run install first."
+ else:
+ text = (
+ f"π Proxy Link\n\n"
+ f"{html.escape(link)}\n\n"
+ f"Open in Telegram to connect."
+ )
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_menu_share(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Share link as QR code."""
+ query = update.callback_query
+ await query.answer()
+
+ link = await get_proxy_link()
+ if not link:
+ text = "β Proxy not installed yet."
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard)
+ return
+
+ # Try to generate QR code
+ qr_file = None
+ code, _, _ = await sh("which", "qrencode")
+ if code == 0:
+ qr_path = "/tmp/proxy_qr.png"
+ code, _, _ = await sh("qrencode", "-o", qr_path, link)
+ if code == 0 and os.path.exists(qr_path):
+ qr_file = qr_path
+
+ if qr_file:
+ try:
+ with open(qr_file, "rb") as f:
+ await query.message.reply_photo(
+ photo=f,
+ caption=f"π€ Proxy QR Code\n\n{html.escape(link)}",
+ parse_mode="HTML",
+ )
+ except Exception as e:
+ logger.error(f"Failed to send QR code: {e}")
+ await safe_edit_message(query,
+ f"π Proxy Link\n\n{html.escape(link)}",
+ parse_mode="HTML",
+ )
+ finally:
+ try:
+ os.remove(qr_file)
+ except OSError:
+ pass
+ else:
+ await safe_edit_message(query,
+ f"π Proxy Link\n\n{html.escape(link)}",
+ reply_markup=InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ ),
+ parse_mode="HTML",
+ )
+
+
+# ============================================================================
+# RESTART & LOGS
+# ============================================================================
+
+
+async def cb_menu_restart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Restart service."""
+ query = update.callback_query
+ await query.answer()
+
+ text = "β³ Restarting telemt service..."
+ await safe_edit_message(query,text)
+
+ code, _, stderr = await sh("systemctl", "restart", TELEMT_SERVICE)
+ if code == 0:
+ text = "β
Service restarted successfully"
+ else:
+ text = f"β Failed to restart:\n{html.escape(stderr[:500])}"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,
+ text, reply_markup=keyboard, parse_mode="HTML"
+ )
+
+
+async def cb_menu_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Show recent logs."""
+ query = update.callback_query
+ await query.answer()
+
+ code, stdout, _ = await sh(
+ "journalctl", "-u", TELEMT_SERVICE, "-n", "30", "--no-pager"
+ )
+
+ if code == 0:
+ log_text = stdout[-1000:] if len(stdout) > 1000 else stdout
+ text = f"π Recent Logs\n\n{html.escape(log_text)}"
+ else:
+ text = "β Failed to retrieve logs"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+# ============================================================================
+# BACKUP & RESTORE
+# ============================================================================
+
+
+async def cb_menu_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Backup menu."""
+ query = update.callback_query
+ await query.answer()
+
+ # List existing backups
+ backups = []
+ try:
+ if os.path.exists(BACKUP_DIR):
+ backups = sorted(
+ [f for f in os.listdir(BACKUP_DIR)
+ if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
+ reverse=True,
+ )
+ except Exception:
+ pass
+
+ buttons = [[InlineKeyboardButton("πΎ Create Backup", callback_data="backup_create")]]
+
+ if backups:
+ buttons.append(
+ [InlineKeyboardButton("π List Backups", callback_data="backup_list")]
+ )
+
+ buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_main")])
+
+ text = f"πΎ Backup Management\n\nExisting backups: {len(backups)}"
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_backup_create(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Create backup."""
+ query = update.callback_query
+ await query.answer()
+
+ await safe_edit_message(query,"β³ Creating backup...")
+
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ backup_file = os.path.join(BACKUP_DIR, f"backup_{timestamp}.tar.gz")
+
+ os.makedirs(BACKUP_DIR, exist_ok=True)
+ code, _, stderr = await sh(
+ "tar", "-czf", backup_file, GOTELEGRAM_CONFIG, TELEMT_CONFIG
+ )
+
+ if code == 0:
+ text = f"β
Backup created:\n{html.escape(backup_file)}"
+ else:
+ text = f"β Backup failed:\n{html.escape(stderr[:500])}"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_backup")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_backup_list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """List backups."""
+ query = update.callback_query
+ await query.answer()
+
+ backups = []
+ try:
+ if os.path.exists(BACKUP_DIR):
+ backups = sorted(
+ [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
+ reverse=True,
+ )
+ except Exception:
+ pass
+
+ if not backups:
+ text = "No backups found"
+ else:
+ text = "π Available Backups\n\n"
+ for backup in backups[:10]:
+ path = os.path.join(BACKUP_DIR, backup)
+ size = os.path.getsize(path) / (1024 * 1024)
+ text += f"{html.escape(backup)} ({size:.2f} MB)\n"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_backup")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_menu_restore(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Restore menu."""
+ query = update.callback_query
+ await query.answer()
+
+ backups = []
+ try:
+ if os.path.exists(BACKUP_DIR):
+ backups = sorted(
+ [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
+ reverse=True,
+ )
+ except Exception:
+ pass
+
+ if not backups:
+ text = "β No backups available"
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ else:
+ text = "Select backup to restore:"
+ buttons = []
+ for i, backup in enumerate(backups[:10]):
+ buttons.append(
+ [
+ InlineKeyboardButton(
+ backup, callback_data=f"restore_idx_{i}"
+ )
+ ]
+ )
+ buttons.append([InlineKeyboardButton("Β« Back", callback_data="menu_main")])
+ keyboard = InlineKeyboardMarkup(buttons)
+ # Store backup list in user_data for retrieval
+ context.user_data["backup_list"] = backups[:10]
+
+ await safe_edit_message(query,text, reply_markup=keyboard)
+
+
+async def cb_restore_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Execute backup restoration."""
+ query = update.callback_query
+ data = query.data
+
+ try:
+ idx = int(data.removeprefix("restore_idx_"))
+ except ValueError:
+ await query.answer("Invalid backup selection")
+ return
+
+ backup_list = context.user_data.get("backup_list", [])
+ if idx < 0 or idx >= len(backup_list):
+ await query.answer("Backup not found")
+ return
+
+ backup_name = backup_list[idx]
+ backup_path = os.path.join(BACKUP_DIR, backup_name)
+
+ await query.answer()
+ await safe_edit_message(query,f"β³ Restoring from {html.escape(backup_name)}...")
+
+ if not os.path.exists(backup_path):
+ text = "β Backup file not found"
+ else:
+ # Simple restore: extract tar to overwrite configs
+ code, _, stderr = await sh(
+ "tar", "-xzf", backup_path, "-C", "/", timeout=60
+ )
+ if code == 0:
+ # Restart services
+ await sh("systemctl", "restart", TELEMT_SERVICE)
+ text = f"β
Restored from {html.escape(backup_name)}"
+ else:
+ text = f"β Restore failed:\n{html.escape(stderr[:500])}"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+# ============================================================================
+# UPDATE & MODE/TEMPLATE CHANGE
+# ============================================================================
+
+
+async def cb_menu_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Update telemt by re-running the install script's update logic."""
+ query = update.callback_query
+ await query.answer()
+
+ await safe_edit_message(query,"β³ Checking for telemt updates...")
+
+ # Get current version
+ cur_code, cur_out, _ = await sh("telemt", "--version")
+ current = cur_out.strip() if cur_code == 0 else "unknown"
+
+ # Check latest release from GitHub
+ code, stdout, stderr = await sh(
+ "curl", "-s", "--max-time", "10",
+ "https://api.github.com/repos/telemt/telemt/releases/latest",
+ )
+
+ if code != 0 or not stdout.strip():
+ text = "β Failed to check for updates"
+ else:
+ try:
+ release = json.loads(stdout)
+ latest = release.get("tag_name", "unknown")
+ if latest == current:
+ text = f"β
telemt is already up to date ({html.escape(current)})"
+ else:
+ text = (
+ f"βΉοΈ Update available: {html.escape(current)} β {html.escape(latest)}\n\n"
+ f"Run the CLI installer to update:\n"
+ f"sudo bash install.sh β menu item 10"
+ )
+ except json.JSONDecodeError:
+ text = "β Failed to parse release info"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_menu_change(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Change mode or template."""
+ query = update.callback_query
+ await query.answer()
+
+ buttons = [
+ [InlineKeyboardButton("β‘ Switch to Quick Mode", callback_data="change_quick")],
+ [InlineKeyboardButton("π Switch to Stealth Mode", callback_data="change_stealth")],
+ [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
+ ]
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,
+ "Change mode or template:", reply_markup=keyboard
+ )
+
+
+async def cb_change_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Switch to quick mode β show domain selection."""
+ query = update.callback_query
+ await query.answer()
+ # Reuse the quick mode domain selection flow
+ await cb_install_mode_quick(update, context)
+
+
+async def cb_change_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Switch to stealth mode β show template categories."""
+ query = update.callback_query
+ await query.answer()
+ # Reuse the stealth mode template selection flow
+ await cb_install_mode_stealth(update, context)
+
+
+async def cb_install_migrate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Migrate from v1 (mtg Docker) to v2 (telemt)."""
+ query = update.callback_query
+ await query.answer()
+
+ await safe_edit_message(query,"β³ Migrating from v1...")
+
+ # Stop old mtg container
+ code, _, stderr = await sh("docker", "stop", "mtproto-proxy", timeout=30)
+ if code != 0:
+ code, _, stderr = await sh("docker", "stop", "mtg", timeout=30)
+
+ # Remove old container
+ await sh("docker", "rm", "mtproto-proxy", timeout=15)
+ await sh("docker", "rm", "mtg", timeout=15)
+
+ text = (
+ "β
v1 container stopped and removed\n\n"
+ "Now select installation mode for v2:"
+ )
+ keyboard = get_install_mode_menu()
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+# ============================================================================
+# WEBSITE & SSL
+# ============================================================================
+
+
+async def cb_menu_website(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Website and SSL management."""
+ query = update.callback_query
+ await query.answer()
+
+ buttons = [
+ [InlineKeyboardButton("π Renew SSL Certificate", callback_data="ssl_renew")],
+ [InlineKeyboardButton("π SSL Status", callback_data="ssl_status")],
+ [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
+ ]
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,
+ "Website & SSL Management:", reply_markup=keyboard
+ )
+
+
+async def cb_ssl_renew(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Renew SSL certificate."""
+ query = update.callback_query
+ await query.answer()
+
+ await safe_edit_message(query,"β³ Renewing SSL certificate...")
+
+ code, stdout, stderr = await sh("certbot", "renew", timeout=120)
+
+ if code == 0:
+ text = "β
SSL certificate renewed successfully"
+ else:
+ text = f"β Renewal failed:\n{html.escape(stderr[:500])}"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_website")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_ssl_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Show SSL status."""
+ query = update.callback_query
+ await query.answer()
+
+ code, stdout, _ = await sh("certbot", "certificates")
+
+ if code == 0:
+ text = f"π SSL Certificates\n\n{html.escape(stdout[:1000])}"
+ else:
+ text = "β Failed to get SSL status"
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_website")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+# ============================================================================
+# PROMO & CREDITS
+# ============================================================================
+
+
+async def cb_menu_promo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Promo information."""
+ query = update.callback_query
+ await query.answer()
+
+ text = (
+ f"π GoTelegram Promo\n\n"
+ f"Share the love! Invite friends to use GoTelegram.\n\n"
+ f"Promo Link\n\n"
+ f"Support development:\n"
+ f"Send a Tip"
+ )
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_menu_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Credits and acknowledgements."""
+ query = update.callback_query
+ await query.answer()
+
+ text = (
+ f"βΉοΈ Credits & Acknowledgements\n\n"
+ f"GoTelegram v{GOTELEGRAM_VERSION}\n\n"
+ f"Built with love for the Telegram community\n\n"
+ f"Special thanks to:\n\n"
+ f"π telemt - MTProxy engine\n"
+ f" High-performance proxy core\n\n"
+ f"π¨ HTML5UP - Beautiful web templates\n"
+ f" Responsive design & themes\n\n"
+ f"π Learning Zone - Educational resources\n"
+ f" Community learning support\n\n"
+ f"π Start Bootstrap - Bootstrap templates\n"
+ f" Professional design framework\n\n"
+ f"π¬ Community - Your feedback & support\n\n"
+ f"GoTelegram is open-source and community-driven"
+ )
+
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+# ============================================================================
+# REMOVE
+# ============================================================================
+
+
+async def cb_menu_remove(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Remove installation."""
+ query = update.callback_query
+ await query.answer()
+
+ text = (
+ "β οΈ Remove GoTelegram\n\n"
+ "This will completely remove the installation.\n"
+ "Are you sure?"
+ )
+
+ buttons = [
+ [InlineKeyboardButton("β Yes, Remove", callback_data="remove_confirm")],
+ [InlineKeyboardButton("Β« Back", callback_data="menu_main")],
+ ]
+ keyboard = InlineKeyboardMarkup(buttons)
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+async def cb_remove_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Confirm removal."""
+ query = update.callback_query
+ await query.answer()
+
+ await safe_edit_message(query,"β³ Removing GoTelegram...")
+
+ # Stop service
+ await sh("systemctl", "stop", TELEMT_SERVICE)
+
+ # Remove directories
+ for path in ["/opt/gotelegram", WEBSITE_ROOT]:
+ await sh("rm", "-rf", path)
+
+ text = "β
GoTelegram removed successfully"
+ keyboard = InlineKeyboardMarkup(
+ [[InlineKeyboardButton("Β« Back", callback_data="menu_main")]]
+ )
+ await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
+
+
+# ============================================================================
+# CALLBACK ROUTING
+# ============================================================================
+
+
+async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Route all callbacks."""
+ query = update.callback_query
+ data = query.data
+
+ # Access control
+ if not is_user_allowed(update.effective_user.id):
+ await query.answer("Access denied")
+ return
+
+ # Main menu
+ if data == "menu_main":
+ await query.answer()
+ buttons = get_main_menu()
+ text = (
+ f"GoTelegram v{GOTELEGRAM_VERSION}\n\n"
+ "π€ MTProxy Management\n"
+ "Select an action:"
+ )
+ await safe_edit_message(query,text, reply_markup=buttons, parse_mode="HTML")
+ return
+
+ if data == "close_menu":
+ await query.answer()
+ await query.delete_message()
+ return
+
+ # Dispatch to handlers
+ handlers = {
+ "menu_install": cb_menu_install,
+ "menu_status": cb_menu_status,
+ "menu_link": cb_menu_link,
+ "menu_share": cb_menu_share,
+ "menu_restart": cb_menu_restart,
+ "menu_logs": cb_menu_logs,
+ "menu_backup": cb_menu_backup,
+ "menu_restore": cb_menu_restore,
+ "menu_update": cb_menu_update,
+ "menu_change": cb_menu_change,
+ "menu_website": cb_menu_website,
+ "menu_promo": cb_menu_promo,
+ "menu_credits": cb_menu_credits,
+ "menu_remove": cb_menu_remove,
+ "install_mode_quick": cb_install_mode_quick,
+ "install_mode_stealth": cb_install_mode_stealth,
+ "backup_create": cb_backup_create,
+ "backup_list": cb_backup_list,
+ "ssl_renew": cb_ssl_renew,
+ "ssl_status": cb_ssl_status,
+ "remove_confirm": cb_remove_confirm,
+ "change_quick": cb_change_quick,
+ "change_stealth": cb_change_stealth,
+ "install_migrate": cb_install_migrate,
+ }
+
+ # Pattern-based handlers
+ if data.startswith("quick_dom_"):
+ await cb_quick_domain(update, context)
+ elif data.startswith("stealth_cat_"):
+ await cb_stealth_category(update, context)
+ elif data.startswith("stealth_tpl_"):
+ await cb_stealth_template(update, context)
+ elif data.startswith("stealth_confirm_"):
+ await cb_stealth_confirm(update, context)
+ elif data.startswith("restore_idx_"):
+ await cb_restore_backup(update, context)
+ elif data in handlers:
+ await handlers[data](update, context)
+ else:
+ await query.answer("Unknown action")
+
+
+# ============================================================================
+# ERROR HANDLERS
+# ============================================================================
+
+
+async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Log errors caused by Updates."""
+ logger.error(f"Exception while handling an update:", exc_info=context.error)
+
+
+# ============================================================================
+# MAIN APPLICATION
+# ============================================================================
+
+
+def main() -> None:
+ """Start the bot."""
+ if not BOT_TOKEN:
+ logger.error("BOT_TOKEN not set in .env file")
+ return
+
+ # Create the Application
+ application = Application.builder().token(BOT_TOKEN).build()
+
+ # Command handlers
+ application.add_handler(CommandHandler("start", cmd_start))
+ application.add_handler(CommandHandler("help", cmd_help))
+ application.add_handler(CommandHandler("status", cmd_status))
+ application.add_handler(CommandHandler("logs", cmd_logs))
+
+ # Callback query handler (buttons)
+ application.add_handler(CallbackQueryHandler(handle_callback))
+
+ # Error handler
+ application.add_error_handler(error_handler)
+
+ # Run the bot
+ logger.info(f"GoTelegram v{GOTELEGRAM_VERSION} bot starting...")
+ application.run_polling(allowed_updates=Update.ALL_TYPES)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/lib/common.sh b/lib/common.sh
index 2804498..7aae651 100644
--- a/lib/common.sh
+++ b/lib/common.sh
@@ -1,456 +1,456 @@
-#!/bin/bash
-# GoTelegram v2.2 β ΠΠ±ΡΠΈΠ΅ ΡΡΠΈΠ»ΠΈΡΡ
-# Π¦Π²Π΅ΡΠ°, Π»ΠΎΠ³ΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅, ΡΠΏΠΈΠ½Π½Π΅Ρ, ΡΠΈΡΡΠ΅ΠΌΠ½ΡΠ΅ ΡΡΠ½ΠΊΡΠΈΠΈ, ΡΠΎΠ²ΠΌΠ΅ΡΡΠΈΠΌΠΎΡΡΡ Ρ v1
-
-# ββ ΠΠ΅ΡΡΠΈΡ ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
-GOTELEGRAM_VERSION="2.2.0"
-GOTELEGRAM_NAME="GoTelegram"
-
-# ββ ΠΡΡΠΈ ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
-GOTELEGRAM_DIR="/opt/gotelegram"
-GOTELEGRAM_CONFIG="$GOTELEGRAM_DIR/config.json"
-TELEMT_CONFIG="/etc/telemt/config.toml"
-TELEMT_BIN="/usr/local/bin/telemt"
-TELEMT_SERVICE="telemt"
-NGINX_SITE_CONF="/etc/nginx/sites-available/gotelegram"
-NGINX_SITE_LINK="/etc/nginx/sites-enabled/gotelegram"
-WEBSITE_ROOT="/var/www/gotelegram-site"
-BACKUP_DIR="$GOTELEGRAM_DIR/backups"
-LOG_FILE="/var/log/gotelegram.log"
-BOT_DIR="/opt/gotelegram-bot"
-
-# ββ V1 ΡΠΎΠ²ΠΌΠ΅ΡΡΠΈΠΌΠΎΡΡΡ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
-V1_CONTAINER_NAME="mtproto-proxy"
-V1_CONFIG_FILE="/opt/gotelegram-bot/proxy.json"
-V1_SERVICE_NAME="gotelegram-bot"
-
-# ββ Π¦Π²Π΅ΡΠ° ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
-RED='\033[0;31m'
-GREEN='\033[0;32m'
-CYAN='\033[0;36m'
-YELLOW='\033[1;33m'
-MAGENTA='\033[0;35m'
-BLUE='\033[0;34m'
-WHITE='\033[1;37m'
-BOLD='\033[1m'
-DIM='\033[2m'
-NC='\033[0m'
-
-# ββ ΠΠΎΠ³ΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
-log_info() { echo -e " ${CYAN}βΉ${NC} $*"; }
-log_success() { echo -e " ${GREEN}β${NC} $*"; }
-log_warning() { echo -e " ${YELLOW}β ${NC} $*"; }
-log_error() { echo -e " ${RED}β${NC} $*"; }
-log_step() { echo -e "\n${BOLD}${WHITE} $*${NC}"; }
-log_dim() { echo -e " ${DIM}$*${NC}"; }
-
-log_to_file() {
- local ts; ts=$(date '+%Y-%m-%d %H:%M:%S')
- echo "[$ts] $*" >> "$LOG_FILE" 2>/dev/null
-}
-
-# ββ Π‘ΠΏΠΈΠ½Π½Π΅Ρ ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
-_spin_pid=""
-spinner_start() {
- local msg="${1:-ΠΠΎΠ΄ΠΎΠΆΠ΄ΠΈΡΠ΅...}"
- (
- local frames=('β ' 'β ' 'β Ή' 'β Έ' 'β Ό' 'β ΄' 'β ¦' 'β §' 'β ' 'β ')
- local i=0
- while true; do
- printf "\r ${CYAN}${frames[$i]}${NC} ${msg}" >&2
- i=$(( (i+1) % ${#frames[@]} ))
- sleep 0.1
- done
- ) &
- _spin_pid=$!
-}
-
-spinner_stop() {
- [ -n "$_spin_pid" ] && kill "$_spin_pid" 2>/dev/null && wait "$_spin_pid" 2>/dev/null
- _spin_pid=""
- printf "\r\033[K" >&2
-}
-
-# ββ ΠΡΠΎΠ³ΡΠ΅ΡΡ-Π±Π°Ρ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
-progress_bar() {
- local current="$1" total="$2" label="${3:-}"
- local pct=$(( current * 100 / total ))
- local filled=$(( pct / 2 ))
- local empty=$(( 50 - filled ))
- local bar=""
- for ((i=0; i