#!/usr/bin/env python3 """ GoTelegram v2.2 Bot - MTProxy Management for Linux Manages telemt engine via Telegram interface with full CLI feature parity Uses python-telegram-bot v21+ """ import asyncio import html import json import logging import os import re import subprocess import toml from datetime import datetime from pathlib import Path from typing import Tuple, Optional, List, Dict, Any from dotenv import load_dotenv from telegram import ( Update, InlineKeyboardButton, InlineKeyboardMarkup, InputFile, ) from telegram.ext import ( Application, CommandHandler, CallbackQueryHandler, ContextTypes, filters, ) from telegram.error import TelegramError, BadRequest # Load environment variables load_dotenv() # Logging configuration logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) # ============================================================================ # CONFIGURATION # ============================================================================ GOTELEGRAM_VERSION = "2.2.0" GOTELEGRAM_CONFIG = "/opt/gotelegram/config.json" TELEMT_CONFIG = "/etc/telemt/config.toml" TELEMT_SERVICE = "telemt" WEBSITE_ROOT = "/var/www/gotelegram-site" BACKUP_DIR = "/opt/gotelegram/backups" TEMPLATES_CATALOG = "/opt/gotelegram/templates_catalog.json" PROMO_LINK = "https://vk.cc/ct29NQ" TIP_LINK = "https://pay.cloudtips.ru/p/7410814f" BOT_TOKEN = os.getenv("BOT_TOKEN") ALLOWED_IDS_STR = os.getenv("ALLOWED_IDS", "") ALLOWED_IDS: set = set() for _id_str in ALLOWED_IDS_STR.split(","): _id_str = _id_str.strip() if _id_str: try: ALLOWED_IDS.add(int(_id_str)) except ValueError: logging.warning(f"Invalid ALLOWED_IDS entry: {_id_str}") QUICK_DOMAINS = [ "google.com", "microsoft.com", "cloudflare.com", "apple.com", "amazon.com", "github.com", "stackoverflow.com", "medium.com", "wikipedia.org", "coursera.org", "udemy.com", "habr.com", "stepik.org", "duolingo.com", "khanacademy.org", "bbc.com", "reuters.com", "nytimes.com", "ted.com", "zoom.us", ] # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ async def sh(*args, timeout: int = 60) -> Tuple[int, str, str]: """Execute shell command asynchronously. Args: *args: Command and arguments timeout: Timeout in seconds Returns: Tuple of (return_code, stdout, stderr) """ try: process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) return ( process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), ) except asyncio.TimeoutError: try: process.kill() await process.wait() except Exception: pass return (-1, "", f"Command timeout after {timeout}s") except Exception as e: return (-1, "", str(e)) def load_json(path: str) -> Optional[Dict]: """Load JSON file.""" try: with open(path, "r") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load {path}: {e}") return None def load_toml(path: str) -> Optional[Dict]: """Load TOML file.""" try: with open(path, "r") as f: return toml.load(f) except Exception as e: logger.warning(f"Failed to load {path}: {e}") return None def save_json(path: str, data: Dict) -> bool: """Save JSON file.""" try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: json.dump(data, f, indent=2) return True except Exception as e: logger.error(f"Failed to save {path}: {e}") return False async def safe_edit_message(query, text: str, reply_markup=None, parse_mode=None) -> bool: """Safely edit message, handling cases where message was deleted or not modified.""" try: await query.edit_message_text( text, reply_markup=reply_markup, parse_mode=parse_mode ) return True except BadRequest as e: err_msg = str(e).lower() if "message is not modified" in err_msg: return True # No change needed, not an error if "message to edit not found" in err_msg or "message can't be edited" in err_msg: logger.warning(f"Cannot edit message: {e}") return False raise # Re-raise unexpected BadRequest async def check_service_status(service: str) -> bool: """Check if systemd service is running.""" code, _, _ = await sh("systemctl", "is-active", service) return code == 0 async def get_telemt_version() -> str: """Get telemt version.""" code, stdout, _ = await sh("telemt", "-v") if code == 0: return stdout.strip().split()[-1] if stdout else "unknown" return "unknown" def is_docker_running() -> bool: """Check if Docker daemon is running.""" try: subprocess.run( ["docker", "ps"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=5, ) return True except Exception: return False async def check_old_container() -> Optional[str]: """Check for old mtg Docker container (v1 migration).""" if not is_docker_running(): return None code, stdout, _ = await sh("docker", "ps", "-a", "--format", "{{.Names}}") if code == 0 and "mtg" in stdout: return "mtg" return None # ============================================================================ # ACCESS CONTROL # ============================================================================ def is_user_allowed(user_id: int) -> bool: """Check if user ID is in ALLOWED_IDS.""" if not ALLOWED_IDS: return True return user_id in ALLOWED_IDS async def require_auth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: """Check authorization and send error if not allowed.""" if not is_user_allowed(update.effective_user.id): await update.message.reply_text( f"Access denied. Your ID: {update.effective_user.id}" ) logger.warning( f"Unauthorized access attempt from user {update.effective_user.id}" ) return False return True # ============================================================================ # MAIN MENU # ============================================================================ def get_main_menu() -> InlineKeyboardMarkup: """Generate main menu keyboard.""" buttons = [ [ InlineKeyboardButton("āš™ļø Install", callback_data="menu_install"), InlineKeyboardButton("šŸ“Š Status", callback_data="menu_status"), ], [ InlineKeyboardButton("šŸ”— Link", callback_data="menu_link"), InlineKeyboardButton("šŸ“¤ Share", callback_data="menu_share"), ], [ InlineKeyboardButton("šŸ”„ Restart", callback_data="menu_restart"), InlineKeyboardButton("šŸ“‹ Logs", callback_data="menu_logs"), ], [ InlineKeyboardButton("⚔ Change Mode/Template", callback_data="menu_change"), InlineKeyboardButton("šŸ’¾ Backup", callback_data="menu_backup"), ], [ InlineKeyboardButton("ā†©ļø Restore", callback_data="menu_restore"), InlineKeyboardButton("šŸ“” Update telemt", callback_data="menu_update"), ], [ InlineKeyboardButton("🌐 Website/SSL", callback_data="menu_website"), InlineKeyboardButton("šŸŽ Promo", callback_data="menu_promo"), ], [ InlineKeyboardButton("šŸ—‘ļø Remove", callback_data="menu_remove"), InlineKeyboardButton("ā„¹ļø Credits", callback_data="menu_credits"), ], [InlineKeyboardButton("āŒ Close", callback_data="close_menu")], ] return InlineKeyboardMarkup(buttons) # ============================================================================ # COMMANDS # ============================================================================ async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Start command - show main menu.""" if not await require_auth(update, context): return welcome = ( f"GoTelegram v{GOTELEGRAM_VERSION}\n\n" "šŸ¤– MTProxy Management Bot\n" "Powered by telemt engine\n\n" "Select an action from the menu below:" ) await update.message.reply_text( welcome, reply_markup=get_main_menu(), parse_mode="HTML" ) async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Help command - show available commands.""" if not await require_auth(update, context): return help_text = ( "GoTelegram Bot Commands\n\n" "/start - Show main menu\n" "/help - Show this help message\n" "/status - Quick status check\n" "/logs - Show recent logs\n\n" "Use the inline menu for all other operations." ) await update.message.reply_text(help_text, parse_mode="HTML") async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Quick status check.""" if not await require_auth(update, context): return await update.message.reply_text("ā³ Checking status...", parse_mode="HTML") status_text = await get_status_text() await update.message.reply_text(status_text, parse_mode="HTML") async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show recent logs.""" if not await require_auth(update, context): return code, stdout, stderr = await sh( "journalctl", "-u", TELEMT_SERVICE, "-n", "20", "--no-pager" ) if code == 0: log_text = stdout[-1500:] if len(stdout) > 1500 else stdout await update.message.reply_text( f"
{html.escape(log_text)}
", parse_mode="HTML", ) else: await update.message.reply_text("Failed to retrieve logs") # ============================================================================ # STATUS # ============================================================================ async def get_status_text() -> str: """Generate status report.""" lines = ["šŸ“Š Current Status\n"] # Service status is_running = await check_service_status(TELEMT_SERVICE) lines.append(f"Service: {'āœ… Running' if is_running else 'āŒ Stopped'}") # Telemt version version = await get_telemt_version() lines.append(f"Telemt: v{version}") # Config status config = load_json(GOTELEGRAM_CONFIG) if config: lines.append(f"Mode: {html.escape(str(config.get('mode', 'unknown')))}") if "template" in config: lines.append(f"Template: {html.escape(str(config['template']))}") if "domain" in config: lines.append(f"Domain: {html.escape(str(config['domain']))}") if "port" in config: lines.append(f"Port: {html.escape(str(config['port']))}") # Telemt config telemt_cfg = load_toml(TELEMT_CONFIG) if telemt_cfg: cfg = telemt_cfg.get("config", {}) if "listen_port" in cfg: lines.append(f"Listen Port: {cfg['listen_port']}") # Backups backup_count = 0 try: if os.path.exists(BACKUP_DIR): backup_count = len([f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")]) except Exception: pass lines.append(f"Backups: {backup_count}") # Old container check old_container = await check_old_container() if old_container: lines.append(f"\nāš ļø Found old container: {html.escape(old_container)}") lines.append("Run 'Install' to migrate") return "\n".join(lines) async def cb_menu_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Status callback.""" query = update.callback_query await query.answer() await safe_edit_message(query,"ā³ Checking status...") status_text = await get_status_text() keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query, status_text, reply_markup=keyboard, parse_mode="HTML" ) # ============================================================================ # INSTALL # ============================================================================ def get_install_mode_menu() -> InlineKeyboardMarkup: """Install mode selection menu.""" buttons = [ [InlineKeyboardButton("⚔ Quick Mode", callback_data="install_mode_quick")], [InlineKeyboardButton("šŸ”’ Stealth Mode", callback_data="install_mode_stealth")], [InlineKeyboardButton("Ā« Back", callback_data="menu_main")], ] return InlineKeyboardMarkup(buttons) async def cb_menu_install(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Install menu callback.""" query = update.callback_query await query.answer() # Check for old container old_container = await check_old_container() if old_container: text = ( f"āš ļø Migration from v1 detected\n\n" f"Found Docker container: {html.escape(old_container)}\n\n" f"Would you like to:\n" f"1. Migrate from v1 (recommended)\n" f"2. Fresh install (will remove old container)\n\n" f"Select below or choose install mode" ) buttons = [ [InlineKeyboardButton("šŸ”„ Migrate from v1", callback_data="install_migrate")], [InlineKeyboardButton("✨ Fresh Install", callback_data="install_mode_quick")], [InlineKeyboardButton("Ā« Back", callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) else: text = "Select installation mode:" keyboard = get_install_mode_menu() await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML" ) async def cb_install_mode_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Quick mode domain selection.""" query = update.callback_query await query.answer() # Show domains with pagination (4 per row, 2 rows) buttons = [] for i in range(0, len(QUICK_DOMAINS), 2): row = [] for j in range(2): if i + j < len(QUICK_DOMAINS): domain = QUICK_DOMAINS[i + j] row.append( InlineKeyboardButton( domain, callback_data=f"quick_dom_{i+j}" ) ) buttons.append(row) buttons.append([InlineKeyboardButton("Ā« Back", callback_data="menu_install")]) text = "Select a domain for quick mode:" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard) async def cb_quick_domain(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Quick domain selection callback.""" query = update.callback_query data = query.data try: domain_idx = int(data.split("_")[-1]) domain = QUICK_DOMAINS[domain_idx] except (ValueError, IndexError): await query.answer("Invalid domain selection") return await query.answer() await safe_edit_message(query,f"ā³ Installing with domain: {domain}...") # Simulate installation (in real scenario, call install script) config = { "mode": "quick", "domain": domain, "port": 443, "installed_at": datetime.now().isoformat(), } if save_json(GOTELEGRAM_CONFIG, config): text = ( f"āœ… Quick mode installed!\n\n" f"Domain: {domain}\n" f"Mode: Quick\n\n" f"Service starting... Check status in 10 seconds." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML" ) else: await safe_edit_message(query, "āŒ Failed to save configuration", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_install")]] ), ) async def cb_install_mode_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Stealth mode - show template categories.""" query = update.callback_query await query.answer() catalog = load_json(TEMPLATES_CATALOG) if not catalog or "categories" not in catalog: await safe_edit_message(query, "āŒ Template catalog not found", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_install")]] ), ) return buttons = [] for cat in catalog.get("categories", []): buttons.append( [ InlineKeyboardButton( f"šŸ“ {cat['name']}", callback_data=f"stealth_cat_{cat['id']}" ) ] ) buttons.append([InlineKeyboardButton("Ā« Back", callback_data="menu_install")]) text = "Stealth Mode - Select Template Category:" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard) async def cb_stealth_category(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show templates in category.""" query = update.callback_query data = query.data cat_id = data.removeprefix("stealth_cat_") await query.answer() catalog = load_json(TEMPLATES_CATALOG) if not catalog: await safe_edit_message(query,"āŒ Template catalog not found") return # Find category and templates category = None templates = [] for cat in catalog.get("categories", []): if cat["id"] == cat_id: category = cat templates = cat.get("templates", []) break if not category: await safe_edit_message(query,"āŒ Category not found") return buttons = [] for tpl in templates: buttons.append( [ InlineKeyboardButton( f"šŸŽØ {tpl['name']}", callback_data=f"stealth_tpl_{tpl['id']}" ) ] ) buttons.append([InlineKeyboardButton("Ā« Back", callback_data="install_mode_stealth")]) text = f"Select template from {html.escape(category['name'])}:" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_stealth_template(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show template preview and confirm.""" query = update.callback_query data = query.data tpl_id = data.removeprefix("stealth_tpl_") await query.answer() catalog = load_json(TEMPLATES_CATALOG) if not catalog: await safe_edit_message(query,"āŒ Template catalog not found") return # Find template template = None for cat in catalog.get("categories", []): for tpl in cat.get("templates", []): if tpl["id"] == tpl_id: template = tpl break if template: break if not template: await safe_edit_message(query,"āŒ Template not found") return tpl_name = html.escape(template.get('name', 'Unknown')) tpl_desc = html.escape(template.get('description', 'N/A')) text = ( f"šŸŽØ Template Preview\n\n" f"Name: {tpl_name}\n" f"Description: {tpl_desc}\n\n" ) if "preview_url" in template: preview_url = html.escape(template['preview_url'], quote=True) text += f'View Live Preview\n\n' text += "Confirm installation?" buttons = [ [ InlineKeyboardButton( "āœ… Install", callback_data=f"stealth_confirm_{tpl_id}" ) ], [InlineKeyboardButton("Ā« Back", callback_data="install_mode_stealth")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_stealth_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Confirm and install stealth template.""" query = update.callback_query data = query.data tpl_id = data.removeprefix("stealth_confirm_") await query.answer() await safe_edit_message(query,"ā³ Installing template...") config = { "mode": "stealth", "template": tpl_id, "port": 443, "installed_at": datetime.now().isoformat(), } if save_json(GOTELEGRAM_CONFIG, config): text = ( f"āœ… Stealth mode installed!\n\n" f"Template: {html.escape(tpl_id)}\n" f"Mode: Stealth\n\n" f"Service starting... Check status in 10 seconds." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML" ) else: await safe_edit_message(query, "āŒ Failed to save configuration", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_install")]] ), ) # ============================================================================ # PROXY LINK & SHARE # ============================================================================ async def get_proxy_link() -> Optional[str]: """Generate proxy link from config.""" config = load_json(GOTELEGRAM_CONFIG) if not config: return None # Get secret from telemt TOML config secret = config.get("secret", "") if not secret: telemt_cfg = load_toml(TELEMT_CONFIG) if telemt_cfg: users = telemt_cfg.get("users", []) if isinstance(users, list) and users: secret = users[0].get("secret", "") if not secret: return None # Get server IP code, stdout, _ = await sh("curl", "-s", "-4", "--max-time", "5", "https://api.ipify.org") server = stdout.strip() if code == 0 and stdout.strip() else "0.0.0.0" port = config.get("port", 443) return f"tg://proxy?server={server}&port={port}&secret={secret}" async def cb_menu_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Generate and show proxy link.""" query = update.callback_query await query.answer() link = await get_proxy_link() if not link: text = "āŒ Proxy not installed yet. Run install first." else: text = ( f"šŸ”— Proxy Link\n\n" f"{html.escape(link)}\n\n" f"Open in Telegram to connect." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_menu_share(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Share link as QR code.""" query = update.callback_query await query.answer() link = await get_proxy_link() if not link: text = "āŒ Proxy not installed yet." keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard) return # Try to generate QR code qr_file = None code, _, _ = await sh("which", "qrencode") if code == 0: qr_path = "/tmp/proxy_qr.png" code, _, _ = await sh("qrencode", "-o", qr_path, link) if code == 0 and os.path.exists(qr_path): qr_file = qr_path if qr_file: try: with open(qr_file, "rb") as f: await query.message.reply_photo( photo=f, caption=f"šŸ“¤ Proxy QR Code\n\n{html.escape(link)}", parse_mode="HTML", ) except Exception as e: logger.error(f"Failed to send QR code: {e}") await safe_edit_message(query, f"šŸ”— Proxy Link\n\n{html.escape(link)}", parse_mode="HTML", ) finally: try: os.remove(qr_file) except OSError: pass else: await safe_edit_message(query, f"šŸ”— Proxy Link\n\n{html.escape(link)}", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ), parse_mode="HTML", ) # ============================================================================ # RESTART & LOGS # ============================================================================ async def cb_menu_restart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Restart service.""" query = update.callback_query await query.answer() text = "ā³ Restarting telemt service..." await safe_edit_message(query,text) code, _, stderr = await sh("systemctl", "restart", TELEMT_SERVICE) if code == 0: text = "āœ… Service restarted successfully" else: text = f"āŒ Failed to restart:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML" ) async def cb_menu_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show recent logs.""" query = update.callback_query await query.answer() code, stdout, _ = await sh( "journalctl", "-u", TELEMT_SERVICE, "-n", "30", "--no-pager" ) if code == 0: log_text = stdout[-1000:] if len(stdout) > 1000 else stdout text = f"šŸ“‹ Recent Logs\n\n
{html.escape(log_text)}
" else: text = "āŒ Failed to retrieve logs" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # BACKUP & RESTORE # ============================================================================ async def cb_menu_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Backup menu.""" query = update.callback_query await query.answer() # List existing backups backups = [] try: if os.path.exists(BACKUP_DIR): backups = sorted( [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")], reverse=True, ) except Exception: pass buttons = [[InlineKeyboardButton("šŸ’¾ Create Backup", callback_data="backup_create")]] if backups: buttons.append( [InlineKeyboardButton("šŸ“‹ List Backups", callback_data="backup_list")] ) buttons.append([InlineKeyboardButton("Ā« Back", callback_data="menu_main")]) text = f"šŸ’¾ Backup Management\n\nExisting backups: {len(backups)}" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_backup_create(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Create backup.""" query = update.callback_query await query.answer() await safe_edit_message(query,"ā³ Creating backup...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = os.path.join(BACKUP_DIR, f"backup_{timestamp}.tar.gz") os.makedirs(BACKUP_DIR, exist_ok=True) code, _, stderr = await sh( "tar", "-czf", backup_file, GOTELEGRAM_CONFIG, TELEMT_CONFIG ) if code == 0: text = f"āœ… Backup created:\n{html.escape(backup_file)}" else: text = f"āŒ Backup failed:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_backup")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_backup_list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """List backups.""" query = update.callback_query await query.answer() backups = [] try: if os.path.exists(BACKUP_DIR): backups = sorted( [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")], reverse=True, ) except Exception: pass if not backups: text = "No backups found" else: text = "šŸ“‹ Available Backups\n\n" for backup in backups[:10]: path = os.path.join(BACKUP_DIR, backup) size = os.path.getsize(path) / (1024 * 1024) text += f"{html.escape(backup)} ({size:.2f} MB)\n" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_backup")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_menu_restore(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Restore menu.""" query = update.callback_query await query.answer() backups = [] try: if os.path.exists(BACKUP_DIR): backups = sorted( [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")], reverse=True, ) except Exception: pass if not backups: text = "āŒ No backups available" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) else: text = "Select backup to restore:" buttons = [] for i, backup in enumerate(backups[:10]): buttons.append( [ InlineKeyboardButton( backup, callback_data=f"restore_idx_{i}" ) ] ) buttons.append([InlineKeyboardButton("Ā« Back", callback_data="menu_main")]) keyboard = InlineKeyboardMarkup(buttons) # Store backup list in user_data for retrieval context.user_data["backup_list"] = backups[:10] await safe_edit_message(query,text, reply_markup=keyboard) async def cb_restore_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Execute backup restoration.""" query = update.callback_query data = query.data try: idx = int(data.removeprefix("restore_idx_")) except ValueError: await query.answer("Invalid backup selection") return backup_list = context.user_data.get("backup_list", []) if idx < 0 or idx >= len(backup_list): await query.answer("Backup not found") return backup_name = backup_list[idx] backup_path = os.path.join(BACKUP_DIR, backup_name) await query.answer() await safe_edit_message(query,f"ā³ Restoring from {html.escape(backup_name)}...") if not os.path.exists(backup_path): text = "āŒ Backup file not found" else: # Simple restore: extract tar to overwrite configs code, _, stderr = await sh( "tar", "-xzf", backup_path, "-C", "/", timeout=60 ) if code == 0: # Restart services await sh("systemctl", "restart", TELEMT_SERVICE) text = f"āœ… Restored from {html.escape(backup_name)}" else: text = f"āŒ Restore failed:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # UPDATE & MODE/TEMPLATE CHANGE # ============================================================================ async def cb_menu_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Update telemt by re-running the install script's update logic.""" query = update.callback_query await query.answer() await safe_edit_message(query,"ā³ Checking for telemt updates...") # Get current version cur_code, cur_out, _ = await sh("telemt", "--version") current = cur_out.strip() if cur_code == 0 else "unknown" # Check latest release from GitHub code, stdout, stderr = await sh( "curl", "-s", "--max-time", "10", "https://api.github.com/repos/telemt/telemt/releases/latest", ) if code != 0 or not stdout.strip(): text = "āŒ Failed to check for updates" else: try: release = json.loads(stdout) latest = release.get("tag_name", "unknown") if latest == current: text = f"āœ… telemt is already up to date ({html.escape(current)})" else: text = ( f"ā„¹ļø Update available: {html.escape(current)} → {html.escape(latest)}\n\n" f"Run the CLI installer to update:\n" f"sudo bash install.sh → menu item 10" ) except json.JSONDecodeError: text = "āŒ Failed to parse release info" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_menu_change(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Change mode or template.""" query = update.callback_query await query.answer() buttons = [ [InlineKeyboardButton("⚔ Switch to Quick Mode", callback_data="change_quick")], [InlineKeyboardButton("šŸ”’ Switch to Stealth Mode", callback_data="change_stealth")], [InlineKeyboardButton("Ā« Back", callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query, "Change mode or template:", reply_markup=keyboard ) async def cb_change_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Switch to quick mode — show domain selection.""" query = update.callback_query await query.answer() # Reuse the quick mode domain selection flow await cb_install_mode_quick(update, context) async def cb_change_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Switch to stealth mode — show template categories.""" query = update.callback_query await query.answer() # Reuse the stealth mode template selection flow await cb_install_mode_stealth(update, context) async def cb_install_migrate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Migrate from v1 (mtg Docker) to v2 (telemt).""" query = update.callback_query await query.answer() await safe_edit_message(query,"ā³ Migrating from v1...") # Stop old mtg container code, _, stderr = await sh("docker", "stop", "mtproto-proxy", timeout=30) if code != 0: code, _, stderr = await sh("docker", "stop", "mtg", timeout=30) # Remove old container await sh("docker", "rm", "mtproto-proxy", timeout=15) await sh("docker", "rm", "mtg", timeout=15) text = ( "āœ… v1 container stopped and removed\n\n" "Now select installation mode for v2:" ) keyboard = get_install_mode_menu() await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # WEBSITE & SSL # ============================================================================ async def cb_menu_website(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Website and SSL management.""" query = update.callback_query await query.answer() buttons = [ [InlineKeyboardButton("šŸ”„ Renew SSL Certificate", callback_data="ssl_renew")], [InlineKeyboardButton("šŸ“Š SSL Status", callback_data="ssl_status")], [InlineKeyboardButton("Ā« Back", callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query, "Website & SSL Management:", reply_markup=keyboard ) async def cb_ssl_renew(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Renew SSL certificate.""" query = update.callback_query await query.answer() await safe_edit_message(query,"ā³ Renewing SSL certificate...") code, stdout, stderr = await sh("certbot", "renew", timeout=120) if code == 0: text = "āœ… SSL certificate renewed successfully" else: text = f"āŒ Renewal failed:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_website")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_ssl_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show SSL status.""" query = update.callback_query await query.answer() code, stdout, _ = await sh("certbot", "certificates") if code == 0: text = f"šŸ“Š SSL Certificates\n\n
{html.escape(stdout[:1000])}
" else: text = "āŒ Failed to get SSL status" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_website")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # PROMO & CREDITS # ============================================================================ async def cb_menu_promo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Promo information.""" query = update.callback_query await query.answer() text = ( f"šŸŽ GoTelegram Promo\n\n" f"Share the love! Invite friends to use GoTelegram.\n\n" f"Promo Link\n\n" f"Support development:\n" f"Send a Tip" ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_menu_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Credits and acknowledgements.""" query = update.callback_query await query.answer() text = ( f"ā„¹ļø Credits & Acknowledgements\n\n" f"GoTelegram v{GOTELEGRAM_VERSION}\n\n" f"Built with love for the Telegram community\n\n" f"Special thanks to:\n\n" f"šŸ™ telemt - MTProxy engine\n" f" High-performance proxy core\n\n" f"šŸŽØ HTML5UP - Beautiful web templates\n" f" Responsive design & themes\n\n" f"šŸ“š Learning Zone - Educational resources\n" f" Community learning support\n\n" f"šŸš€ Start Bootstrap - Bootstrap templates\n" f" Professional design framework\n\n" f"šŸ’¬ Community - Your feedback & support\n\n" f"GoTelegram is open-source and community-driven" ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # REMOVE # ============================================================================ async def cb_menu_remove(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Remove installation.""" query = update.callback_query await query.answer() text = ( "āš ļø Remove GoTelegram\n\n" "This will completely remove the installation.\n" "Are you sure?" ) buttons = [ [InlineKeyboardButton("āŒ Yes, Remove", callback_data="remove_confirm")], [InlineKeyboardButton("Ā« Back", callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_remove_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Confirm removal.""" query = update.callback_query await query.answer() await safe_edit_message(query,"ā³ Removing GoTelegram...") # Stop service await sh("systemctl", "stop", TELEMT_SERVICE) # Remove directories for path in ["/opt/gotelegram", WEBSITE_ROOT]: await sh("rm", "-rf", path) text = "āœ… GoTelegram removed successfully" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("Ā« Back", callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # CALLBACK ROUTING # ============================================================================ async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Route all callbacks.""" query = update.callback_query data = query.data # Access control if not is_user_allowed(update.effective_user.id): await query.answer("Access denied") return # Main menu if data == "menu_main": await query.answer() buttons = get_main_menu() text = ( f"GoTelegram v{GOTELEGRAM_VERSION}\n\n" "šŸ¤– MTProxy Management\n" "Select an action:" ) await safe_edit_message(query,text, reply_markup=buttons, parse_mode="HTML") return if data == "close_menu": await query.answer() await query.delete_message() return # Dispatch to handlers handlers = { "menu_install": cb_menu_install, "menu_status": cb_menu_status, "menu_link": cb_menu_link, "menu_share": cb_menu_share, "menu_restart": cb_menu_restart, "menu_logs": cb_menu_logs, "menu_backup": cb_menu_backup, "menu_restore": cb_menu_restore, "menu_update": cb_menu_update, "menu_change": cb_menu_change, "menu_website": cb_menu_website, "menu_promo": cb_menu_promo, "menu_credits": cb_menu_credits, "menu_remove": cb_menu_remove, "install_mode_quick": cb_install_mode_quick, "install_mode_stealth": cb_install_mode_stealth, "backup_create": cb_backup_create, "backup_list": cb_backup_list, "ssl_renew": cb_ssl_renew, "ssl_status": cb_ssl_status, "remove_confirm": cb_remove_confirm, "change_quick": cb_change_quick, "change_stealth": cb_change_stealth, "install_migrate": cb_install_migrate, } # Pattern-based handlers if data.startswith("quick_dom_"): await cb_quick_domain(update, context) elif data.startswith("stealth_cat_"): await cb_stealth_category(update, context) elif data.startswith("stealth_tpl_"): await cb_stealth_template(update, context) elif data.startswith("stealth_confirm_"): await cb_stealth_confirm(update, context) elif data.startswith("restore_idx_"): await cb_restore_backup(update, context) elif data in handlers: await handlers[data](update, context) else: await query.answer("Unknown action") # ============================================================================ # ERROR HANDLERS # ============================================================================ async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Log errors caused by Updates.""" logger.error(f"Exception while handling an update:", exc_info=context.error) # ============================================================================ # MAIN APPLICATION # ============================================================================ def main() -> None: """Start the bot.""" if not BOT_TOKEN: logger.error("BOT_TOKEN not set in .env file") return # Create the Application application = Application.builder().token(BOT_TOKEN).build() # Command handlers application.add_handler(CommandHandler("start", cmd_start)) application.add_handler(CommandHandler("help", cmd_help)) application.add_handler(CommandHandler("status", cmd_status)) application.add_handler(CommandHandler("logs", cmd_logs)) # Callback query handler (buttons) application.add_handler(CallbackQueryHandler(handle_callback)) # Error handler application.add_error_handler(error_handler) # Run the bot logger.info(f"GoTelegram v{GOTELEGRAM_VERSION} bot starting...") application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": main()