#!/usr/bin/env python3 """ GoTelegram v2.4 Bot - MTProxy Management for Linux Manages telemt engine via Telegram interface with full CLI feature parity Uses python-telegram-bot v21+ Supports EN/RU UI with per-user language preferences. """ import asyncio import csv import hashlib import html import json import logging import os import re import shutil import subprocess import sys import time import toml from datetime import datetime from io import StringIO from pathlib import Path from typing import Tuple, Optional, List, Dict, Any from dotenv import load_dotenv from telegram import ( Update, InlineKeyboardButton, InlineKeyboardMarkup, InputFile, ) from telegram.ext import ( Application, CommandHandler, CallbackQueryHandler, ContextTypes, MessageHandler, filters, ) from telegram.error import TelegramError, BadRequest # i18n — loaded from the bot directory next to this file _BOT_DIR = Path(__file__).resolve().parent if str(_BOT_DIR) not in sys.path: sys.path.insert(0, str(_BOT_DIR)) try: from i18n import ( t as _t, tf as _tf, get_user_lang, set_user_lang, get_language_name, SUPPORTED_LANGS, ) except Exception as _i18n_err: # pragma: no cover — defensive fallback logging.warning("i18n module not available: %s", _i18n_err) def _t(user_id, key, default=None): return default if default is not None else key def _tf(user_id, key, *args, default=None): template = default if default is not None else key try: return template % args if args else template except Exception: return template def get_user_lang(user_id): return "en" def set_user_lang(user_id, code): return False def get_language_name(code): return code SUPPORTED_LANGS = ("en",) def _uid(update: Optional[Update]) -> Optional[int]: """Extract user id from an update (if any).""" if update is None: return None user = getattr(update, "effective_user", None) return user.id if user else None # Load environment variables load_dotenv() # Logging configuration logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) # ============================================================================ # CONFIGURATION # ============================================================================ GOTELEGRAM_VERSION = "2.4.10" GOTELEGRAM_CONFIG = "/opt/gotelegram/config.json" TELEMT_CONFIG = "/etc/telemt/config.toml" TELEMT_SERVICE = "telemt" WEBSITE_ROOT = "/var/www/gotelegram-site" BACKUP_DIR = "/opt/gotelegram/backups" TEMPLATES_CATALOG = "/opt/gotelegram/templates_catalog.json" INSTALL_SH = "/opt/gotelegram/install.sh" PROMO_LINK_1 = "https://vk.cc/ct29NQ" PROMO_LINK_2 = "https://vk.cc/cUxAhj" TIP_LINK = "https://pay.cloudtips.ru/p/7410814f" PROMO_STAMP_FILE = "/opt/gotelegram/.promo_bot_last_shown" BOT_TOKEN = os.getenv("BOT_TOKEN") ENV_FILE = "/opt/gotelegram-bot/.env" # ── Загрузка ALLOWED_IDS ──────────────────────────────────────────────────── # Поддерживает запятую, пробел, или их комбинацию как разделитель ALLOWED_IDS: set = set() _WAITING_FOR_ADMIN = False # True если список пуст → ждём первого админа def _load_allowed_ids() -> None: """Загрузить ALLOWED_IDS из переменной окружения.""" global ALLOWED_IDS, _WAITING_FOR_ADMIN raw = os.getenv("ALLOWED_IDS", "") ALLOWED_IDS = set() # Разделители: запятая, пробел, или оба for part in re.split(r'[,\s]+', raw): part = part.strip() if part: try: ALLOWED_IDS.add(int(part)) except ValueError: logging.warning(f"Invalid ALLOWED_IDS entry: {part}") _WAITING_FOR_ADMIN = len(ALLOWED_IDS) == 0 def _save_allowed_ids() -> None: """Сохранить ALLOWED_IDS в .env файл и обновить os.environ.""" global _WAITING_FOR_ADMIN ids_str = ",".join(str(i) for i in sorted(ALLOWED_IDS)) os.environ["ALLOWED_IDS"] = ids_str _WAITING_FOR_ADMIN = len(ALLOWED_IDS) == 0 if not os.path.exists(ENV_FILE): return try: with open(ENV_FILE, "r") as f: lines = f.readlines() found = False new_lines = [] for line in lines: if line.strip().startswith("ALLOWED_IDS="): if ids_str: new_lines.append(f"ALLOWED_IDS={ids_str}\n") # Если пусто — удаляем строку found = True else: new_lines.append(line) if not found and ids_str: new_lines.append(f"ALLOWED_IDS={ids_str}\n") with open(ENV_FILE, "w") as f: f.writelines(new_lines) logger.info(f"ALLOWED_IDS updated in .env: {ids_str or '(empty)'}") except OSError as e: logger.error(f"Failed to update .env: {e}") _load_allowed_ids() LITE_DOMAINS = [ "google.com", "microsoft.com", "cloudflare.com", "apple.com", "amazon.com", "github.com", "stackoverflow.com", "medium.com", "wikipedia.org", "coursera.org", "udemy.com", "habr.com", "stepik.org", "duolingo.com", "khanacademy.org", "bbc.com", "reuters.com", "nytimes.com", "ted.com", "zoom.us", ] # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ async def sh(*args, timeout: int = 60) -> Tuple[int, str, str]: """Execute shell command asynchronously. Args: *args: Command and arguments timeout: Timeout in seconds Returns: Tuple of (return_code, stdout, stderr) """ try: process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) return ( process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), ) except asyncio.TimeoutError: try: process.kill() await process.wait() except Exception: pass return (-1, "", f"Command timeout after {timeout}s") except Exception as e: return (-1, "", str(e)) # Per-host mutex preventing concurrent install.sh --action invocations. Two # admins hitting "change template" at the same second could race each other # and corrupt /var/www/gotelegram-site. One global lock is fine — these are # rare operations and should serialize cleanly. _BOT_ACTION_LOCK = asyncio.Lock() # Allowed template-id shape: catalog ids are [a-zA-Z0-9_-], never longer than 64. # This is a defense-in-depth check before we hand the value to subprocess. _TPL_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$") # Allowed Lite mask domain shape — simple DNS hostname, up to 253 chars total. # Each label 1–63 chars, labels separated by dots, alphanumerics + hyphens. _DOMAIN_RE = re.compile( r"^(?=.{1,253}$)(?:(?!-)[A-Za-z0-9-]{1,63}(? Dict: """Invoke install.sh --action=X --json and parse the JSON result. Args: action: action name (e.g. "change-template", "change-lite-domain") timeout: seconds to wait for completion (long ops: template download can take time) **params: arbitrary key→value pairs, each passed as --key=value Returns: dict with at least {"status": "success|error", "message": "..."}. Transport errors are mapped to {"status":"error","message":..., "code":"transport"} """ cmd = ["bash", INSTALL_SH, f"--action={action}", "--json"] for k, v in params.items(): if v is None: continue cmd.append(f"--{k.replace('_', '-')}={v}") code, stdout, stderr = await sh(*cmd, timeout=timeout) stdout = (stdout or "").strip() # install.sh may print multiple log lines to stderr; the JSON is on stdout. # Pick the last non-empty line that looks like JSON (robust to any stray output). json_line = None for line in reversed(stdout.splitlines()): line = line.strip() if line.startswith("{") and line.endswith("}"): json_line = line break if json_line: try: data = json.loads(json_line) if isinstance(data, dict) and "status" in data: return data except json.JSONDecodeError as e: logger.warning(f"run_bot_action: JSON parse failed: {e} | line={json_line!r}") # No JSON from install.sh — synthesize an error result tail = (stderr or "")[-300:] if stderr else "" logger.error( f"run_bot_action({action}): no JSON output, rc={code}, " f"stdout={stdout[-300:]!r}, stderr={tail!r}" ) return { "status": "error", "message": "install.sh did not return a JSON result", "code": "transport", "rc": str(code), } def load_json(path: str) -> Optional[Dict]: """Load JSON file.""" try: with open(path, "r") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load {path}: {e}") return None def load_toml(path: str) -> Optional[Dict]: """Load TOML file.""" try: with open(path, "r") as f: return toml.load(f) except Exception as e: logger.warning(f"Failed to load {path}: {e}") return None def save_json(path: str, data: Dict) -> bool: """Save JSON file.""" try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: json.dump(data, f, indent=2) return True except Exception as e: logger.error(f"Failed to save {path}: {e}") return False async def safe_edit_message( query, text: str, reply_markup=None, parse_mode=None, disable_web_page_preview: Optional[bool] = None, ) -> bool: """Safely edit message, handling cases where message was deleted or not modified. `disable_web_page_preview` is forwarded to edit_message_text when set; omitting it keeps Telegram's default (enabled). """ kwargs = {"reply_markup": reply_markup, "parse_mode": parse_mode} if disable_web_page_preview is not None: kwargs["disable_web_page_preview"] = disable_web_page_preview try: await query.edit_message_text(text, **kwargs) return True except BadRequest as e: err_msg = str(e).lower() if "message is not modified" in err_msg: return True # No change needed, not an error if "message to edit not found" in err_msg or "message can't be edited" in err_msg: logger.warning(f"Cannot edit message: {e}") return False raise # Re-raise unexpected BadRequest async def _delete_message_after(message, delay: int = 30) -> None: """Delete a Telegram message after `delay` seconds. Errors are swallowed (message may already be deleted by the user). Used for ephemeral content like promo blocks that should auto-cleanup.""" try: await asyncio.sleep(delay) await message.delete() except Exception as e: logger.debug(f"_delete_message_after: {e}") async def check_service_status(service: str) -> bool: """Check if systemd service is running.""" code, _, _ = await sh("systemctl", "is-active", service) return code == 0 async def get_telemt_version() -> str: """Get telemt version.""" code, stdout, _ = await sh("telemt", "-v") if code == 0: return stdout.strip().split()[-1] if stdout else "unknown" return "unknown" def is_docker_running() -> bool: """Check if Docker daemon is running.""" try: subprocess.run( ["docker", "ps"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=5, ) return True except Exception: return False async def check_old_container() -> Optional[str]: """Check for old mtg Docker container (v1 migration).""" if not is_docker_running(): return None code, stdout, _ = await sh("docker", "ps", "-a", "--format", "{{.Names}}") if code == 0 and "mtg" in stdout: return "mtg" return None # ============================================================================ # ACCESS CONTROL # ============================================================================ def is_user_allowed(user_id: int) -> bool: """Check if user ID is in ALLOWED_IDS. If list is empty — waiting for admin.""" if _WAITING_FOR_ADMIN: return False # Никому не даём доступ пока не назначен админ return user_id in ALLOWED_IDS def add_admin(user_id: int) -> None: """Добавить администратора и сохранить в .env.""" ALLOWED_IDS.add(user_id) _save_allowed_ids() logger.info(f"Admin added: {user_id}") def remove_admin(user_id: int) -> None: """Убрать администратора и сохранить в .env.""" ALLOWED_IDS.discard(user_id) _save_allowed_ids() logger.info(f"Admin removed: {user_id}") async def require_auth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: """Check authorization and send error if not allowed.""" user_id = update.effective_user.id # Режим ожидания первого админа — обрабатывается в cmd_start if _WAITING_FOR_ADMIN: return False if not is_user_allowed(user_id): if update.message: await update.message.reply_text( f"⛔ Доступ запрещён.\nВаш ID: {user_id}", parse_mode="HTML", ) logger.warning(f"Unauthorized access attempt from user {user_id}") return False return True # ============================================================================ # MAIN MENU # ============================================================================ def get_main_menu(user_id: Optional[int] = None) -> InlineKeyboardMarkup: """Generate main menu keyboard localized for the given user.""" buttons = [ [ InlineKeyboardButton(_t(user_id, "menu_install"), callback_data="menu_install"), InlineKeyboardButton(_t(user_id, "menu_status"), callback_data="menu_status"), ], [ InlineKeyboardButton(_t(user_id, "menu_link"), callback_data="menu_link"), InlineKeyboardButton(_t(user_id, "menu_share"), callback_data="menu_share"), ], [ InlineKeyboardButton(_t(user_id, "menu_restart"), callback_data="menu_restart"), InlineKeyboardButton(_t(user_id, "menu_logs"), callback_data="menu_logs"), ], [ InlineKeyboardButton(_t(user_id, "menu_change"), callback_data="menu_change"), InlineKeyboardButton(_t(user_id, "menu_backup"), callback_data="menu_backup"), ], [ InlineKeyboardButton(_t(user_id, "menu_restore"), callback_data="menu_restore"), InlineKeyboardButton(_t(user_id, "menu_update"), callback_data="menu_update"), ], [ InlineKeyboardButton(_t(user_id, "menu_website"), callback_data="menu_website"), InlineKeyboardButton(_t(user_id, "menu_promo"), callback_data="menu_promo"), ], [ InlineKeyboardButton(_t(user_id, "menu_stats"), callback_data="menu_stats"), InlineKeyboardButton(_t(user_id, "menu_remove"), callback_data="menu_remove"), ], [ InlineKeyboardButton(_t(user_id, "menu_admins"), callback_data="menu_admins"), InlineKeyboardButton(_t(user_id, "menu_credits"), callback_data="menu_credits"), ], [ InlineKeyboardButton(_t(user_id, "menu_language"), callback_data="menu_lang"), InlineKeyboardButton(_t(user_id, "menu_close"), callback_data="close_menu"), ], ] return InlineKeyboardMarkup(buttons) # ============================================================================ # COMMANDS # ============================================================================ async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Start command - show main menu, promo once per day. Если ALLOWED_IDS пуст — режим авто-регистрации первого админа. """ user = update.effective_user user_id = user.id # ── Режим ожидания первого админа ── if _WAITING_FOR_ADMIN: name = user.full_name or user.username or str(user_id) title = _tf(user_id, "waiting_admin_title", html.escape(name)) body = _tf(user_id, "waiting_admin_body", user_id) text = f"{title}\n\n{body}" keyboard = InlineKeyboardMarkup([ [ InlineKeyboardButton(_t(user_id, "btn_yes"), callback_data=f"admin_confirm_{user_id}"), InlineKeyboardButton(_t(user_id, "btn_no"), callback_data="admin_cancel"), ] ]) await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML") return # ── Проверка доступа ── if not is_user_allowed(user_id): await update.message.reply_text( _tf(user_id, "access_denied", user_id), parse_mode="HTML", ) return welcome = ( f"{_tf(user_id, 'welcome_title', GOTELEGRAM_VERSION)}\n\n" f"{_t(user_id, 'welcome_subtitle')}\n" f"{_t(user_id, 'welcome_powered')}\n\n" f"{_t(user_id, 'welcome_prompt')}" ) await update.message.reply_text( welcome, reply_markup=get_main_menu(user_id), parse_mode="HTML" ) # Промо раз в сутки — сообщение само удаляется через 30 секунд if should_show_promo_bot(): mark_promo_shown_bot() promo_msg = await update.message.reply_text( get_promo_text(), parse_mode="HTML", disable_web_page_preview=True ) asyncio.create_task(_delete_message_after(promo_msg, 30)) async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Help command - show available commands.""" if not await require_auth(update, context): return user_id = _uid(update) help_text = ( f"{_t(user_id, 'help_title')}\n\n" f"{_t(user_id, 'help_lines')}" ) await update.message.reply_text(help_text, parse_mode="HTML") async def cmd_lang(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show language picker.""" if not await require_auth(update, context): return user_id = _uid(update) current = get_user_lang(user_id) title = _t(user_id, "lang_title") curr_line = _tf(user_id, "lang_current", get_language_name(current)) prompt = _t(user_id, "lang_choose") text = f"{title}\n\n{curr_line}\n\n{prompt}" keyboard = InlineKeyboardMarkup([ [ InlineKeyboardButton("🇬🇧 English", callback_data="lang_set_en"), InlineKeyboardButton("🇷🇺 Русский", callback_data="lang_set_ru"), ], [InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_main")], ]) await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML") async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Quick status check.""" if not await require_auth(update, context): return user_id = _uid(update) await update.message.reply_text(_t(user_id, "status_checking"), parse_mode="HTML") status_text = await get_status_text(user_id) await update.message.reply_text(status_text, parse_mode="HTML") async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show recent logs.""" if not await require_auth(update, context): return user_id = _uid(update) code, stdout, stderr = await sh( "journalctl", "-u", TELEMT_SERVICE, "-n", "20", "--no-pager" ) if code == 0: log_text = stdout[-1500:] if len(stdout) > 1500 else stdout await update.message.reply_text( f"
{html.escape(log_text)}
", parse_mode="HTML", ) else: await update.message.reply_text(_t(user_id, "logs_failed")) # ============================================================================ # STATUS # ============================================================================ async def get_status_text(user_id: Optional[int] = None) -> str: """Generate status report (localized).""" lines = [f"{_t(user_id, 'status_title')}\n"] # Service status is_running = await check_service_status(TELEMT_SERVICE) running = _t(user_id, "status_running") if is_running else _t(user_id, "status_stopped") lines.append(f"{_t(user_id, 'status_service')}: {running}") # Telemt version version = await get_telemt_version() lines.append(f"{_t(user_id, 'status_telemt')}: v{version}") # Config status config = load_json(GOTELEGRAM_CONFIG) if config: lines.append(f"{_t(user_id, 'status_mode')}: {html.escape(str(config.get('mode', 'unknown')))}") # install.sh/save_gotelegram_config uses "template_id" (not "template") tpl = config.get("template_id") or config.get("template") if tpl: lines.append(f"{_t(user_id, 'status_template')}: {html.escape(str(tpl))}") if config.get("domain"): lines.append(f"{_t(user_id, 'status_domain')}: {html.escape(str(config['domain']))}") if config.get("port"): lines.append(f"{_t(user_id, 'status_port')}: {html.escape(str(config['port']))}") # Telemt config (v3: [server] port = ..., [censorship] tls_domain = ...) telemt_cfg = load_toml(TELEMT_CONFIG) if telemt_cfg: server_cfg = telemt_cfg.get("server", {}) if "port" in server_cfg: lines.append(f"{_t(user_id, 'status_listen_port')}: {server_cfg['port']}") censor_cfg = telemt_cfg.get("censorship", {}) if "tls_domain" in censor_cfg: lines.append(f"{_t(user_id, 'status_tls_domain')}: {html.escape(str(censor_cfg['tls_domain']))}") # Backups backup_count = 0 try: if os.path.exists(BACKUP_DIR): backup_count = len([f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")]) except Exception: pass lines.append(f"Backups: {backup_count}") # Old container check old_container = await check_old_container() if old_container: lines.append(f"\n⚠️ Found old container: {html.escape(old_container)}") lines.append("Run 'Install' to migrate") return "\n".join(lines) async def get_traffic_stats(user_id: Optional[int] = None) -> str: """Get formatted traffic statistics.""" # Read current snapshot current_file = "/run/gotelegram/stats_current.json" history_file = "/opt/gotelegram/stats_history.csv" try: with open(current_file, "r") as f: current = json.load(f) except Exception: return f"📊 {_t(user_id, 'stats_title', 'Statistics')}\n\n{_t(user_id, 'stats_unavailable', 'Data unavailable. Make sure stats module is enabled.')}" # Read history history = [] try: with open(history_file, "r") as f: reader = csv.reader(f) for row in reader: if len(row) >= 3: history.append({ "ts": int(row[0]), "proxy": int(row[1]), "site": int(row[2]), }) except Exception: pass now = int(time.time()) def format_bytes(b): if b < 1024: return f"{b} B" if b < 1048576: return f"{b/1024:.1f} KB" if b < 1073741824: return f"{b/1048576:.1f} MB" return f"{b/1073741824:.1f} GB" def format_rate(bps): if bps < 1024: return f"{bps:.0f} B/s" if bps < 1048576: return f"{bps/1024:.1f} KB/s" return f"{bps/1048576:.1f} MB/s" def calc_for_period(secs, key): target_ts = now - secs # Find closest snapshot to target_ts closest = None for h in history: if h["ts"] <= target_ts: if closest is None or h["ts"] > closest["ts"]: closest = h if closest is None: return "—", "—" current_val = current.get(f"{key}_bytes", 0) diff = current_val - closest[key] if diff < 0: diff = 0 elapsed = now - closest["ts"] if elapsed <= 0: elapsed = 1 rate = diff / elapsed return format_bytes(diff), format_rate(rate) periods = [ (_t(user_id, "stats_1min", "1 min"), 60), (_t(user_id, "stats_5min", "5 min"), 300), (_t(user_id, "stats_60min", "60 min"), 3600), (_t(user_id, "stats_1day", "1 day"), 86400), (_t(user_id, "stats_7days", "7 days"), 604800), (_t(user_id, "stats_30days", "30 days"), 2592000), (_t(user_id, "stats_365days", "365 days"), 31536000), ] hdr_period = _t(user_id, "stats_hdr_period", "Period") hdr_traffic = _t(user_id, "stats_hdr_traffic", "Traffic") hdr_rate = _t(user_id, "stats_hdr_rate", "Rate") lines = [f"📊 {_t(user_id, 'stats_traffic_title', 'Traffic statistics')}\n"] lbl_proxy = _t(user_id, "stats_proxy_label", "Proxy (telemt)") lbl_site = _t(user_id, "stats_site_label", "Site (nginx)") for label, key in [(lbl_proxy, "proxy"), (lbl_site, "site")]: lines.append(f"\n{label}:") lines.append("
")
        lines.append(f"{hdr_period:<10} │ {hdr_traffic:>10} │ {hdr_rate:>10}")
        lines.append("─" * 36)
        for name, secs in periods:
            total, rate = calc_for_period(secs, key)
            lines.append(f"{name:<10} │ {total:>10} │ {rate:>10}")
        lines.append("
") return "\n".join(lines) async def cb_menu_stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show traffic statistics.""" query = update.callback_query await query.answer() uid = _uid(update) stats_text = await get_traffic_stats(uid) keyboard = [ [InlineKeyboardButton(_t(uid, "btn_refresh", "🔄 Refresh"), callback_data="menu_stats")], [InlineKeyboardButton(_t(uid, "btn_back"), callback_data="menu_main")], ] await safe_edit_message( query, stats_text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML", ) async def cb_menu_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Status callback — show detailed proxy/server status.""" query = update.callback_query await query.answer() if not await require_auth(update, context): return text = await get_status_text(_uid(update)) keyboard = [ [InlineKeyboardButton("🔄 Обновить", callback_data="menu_status")], [InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")], ] await safe_edit_message( query, text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML", ) # ============================================================================ # INSTALL # ============================================================================ def get_install_mode_menu() -> InlineKeyboardMarkup: """Install mode selection menu.""" buttons = [ [InlineKeyboardButton("⚡ Lite", callback_data="install_mode_lite")], [InlineKeyboardButton("🛡 Pro", callback_data="install_mode_pro")], [InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")], ] return InlineKeyboardMarkup(buttons) async def cb_menu_install(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Install menu callback.""" query = update.callback_query await query.answer() # Check for old container old_container = await check_old_container() if old_container: text = ( f"⚠️ Migration from v1 detected\n\n" f"Found Docker container: {html.escape(old_container)}\n\n" f"Would you like to:\n" f"1. Migrate from v1 (recommended)\n" f"2. Fresh install (will remove old container)\n\n" f"Select below or choose install mode" ) buttons = [ [InlineKeyboardButton("🔄 Migrate from v1", callback_data="install_migrate")], [InlineKeyboardButton("✨ Fresh Install", callback_data="install_mode_lite")], [InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) else: text = "Select installation mode:" keyboard = get_install_mode_menu() await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML" ) async def cb_install_mode_lite(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Lite mode domain selection.""" query = update.callback_query await query.answer() # Show domains with pagination (4 per row, 2 rows) buttons = [] for i in range(0, len(LITE_DOMAINS), 2): row = [] for j in range(2): if i + j < len(LITE_DOMAINS): domain = LITE_DOMAINS[i + j] row.append( InlineKeyboardButton( domain, callback_data=f"lite_dom_{i+j}" ) ) buttons.append(row) buttons.append([InlineKeyboardButton("« Back", callback_data="menu_install")]) text = "Select a domain for Lite mode:" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard) async def cb_lite_domain(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Lite domain selection callback — real implementation (v2.4.2+). Branches on current mode: * lite mode (active): invoke `install.sh --action=change-lite-domain` which regenerates the telemt TOML with a new fake-TLS mask domain and restarts the service. Preserves secret/port. * any other mode: route to CLI. Fresh Lite install is interactive. """ query = update.callback_query data = query.data try: domain_idx = int(data.split("_")[-1]) domain = LITE_DOMAINS[domain_idx] except (ValueError, IndexError): await query.answer("Invalid domain selection") return # Defense-in-depth: LITE_DOMAINS is trusted, but validate the shape anyway # in case someone extends the list with garbage later. if not _DOMAIN_RE.match(domain): logger.warning(f"cb_lite_domain: rejecting malformed domain {domain!r}") await query.answer("Invalid domain") return await query.answer() config = load_json(GOTELEGRAM_CONFIG) or {} current_mode = config.get("mode", "") if current_mode != "lite": text = ( "⚠️ Установка Lite из бота пока не поддерживается\n\n" f"Выбранный домен: {html.escape(domain)}\n\n" "Чтобы установить Lite, запустите на сервере:\n" "gotelegram1) Прокси → 1) Установить/Обновить → Lite\n\n" "Существующая конфигурация не была изменена." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML") return # Lite active — switch fake-TLS mask domain in place if _BOT_ACTION_LOCK.locked(): await safe_edit_message( query, "⏳ Другая операция уже выполняется\n\n" "Дождитесь завершения предыдущей смены шаблона/домена и повторите.", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ), parse_mode="HTML", ) return progress_text = ( "⏳ Меняю маскировочный домен...\n\n" f"Новый домен: {html.escape(domain)}\n\n" "Перегенерирую конфиг telemt и перезапускаю сервис." ) await safe_edit_message(query, progress_text, parse_mode="HTML") async with _BOT_ACTION_LOCK: result = await run_bot_action("change-lite-domain", timeout=30, domain=domain) if result.get("status") == "success": text = ( "✅ Маскировочный домен обновлён\n\n" f"Новый домен: {html.escape(domain)}\n\n" "telemt перезапущен. Важно: старые ссылки подключения больше " "не будут работать — нужно заново раздать новые." ) else: err_msg = result.get("message", "unknown error") err_code = result.get("code", "") text = ( "❌ Не удалось сменить домен\n\n" f"Домен: {html.escape(domain)}\n" f"Причина: {html.escape(err_msg)}" + (f" ({html.escape(err_code)})" if err_code else "") + "\n\n" "Существующая конфигурация не была изменена." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML") async def cb_install_mode_pro(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Pro mode - show template categories.""" query = update.callback_query await query.answer() catalog = load_json(TEMPLATES_CATALOG) if not catalog or "categories" not in catalog: await safe_edit_message(query, "❌ Template catalog not found", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("« Back", callback_data="menu_install")]] ), ) return user_id = _uid(update) buttons = [] # First item: custom git template (matches CLI behaviour) buttons.append([InlineKeyboardButton( _t(user_id, "cg_title"), callback_data="pro_custom_git" )]) for cat in catalog.get("categories", []): buttons.append( [ InlineKeyboardButton( f"📁 {cat['name']}", callback_data=f"pro_cat_{cat['id']}" ) ] ) buttons.append([InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_install")]) text = "Pro Mode — Select Template Category:" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query, text, reply_markup=keyboard) # ── Custom git template input flow ────────────────────────────────────────── _CUSTOM_GIT_WAITERS: Dict[int, bool] = {} _CUSTOM_GIT_URL_RE = re.compile(r'^https://[A-Za-z0-9._~:/\-?#\[\]@!$&\'()*+,;=%]+(@[A-Za-z0-9._\-/]+)?$') _CUSTOM_GIT_MAX_MB = 100 _CUSTOM_GIT_CLONE_TIMEOUT = 90 def _validate_custom_git_url(url: str) -> bool: if not url or len(url) > 512: return False # Block shell metacharacters explicitly for bad in (" ", "`", "$", "(", ")", "<", ">", "|", "\\", "\t", "\n", "\r", ";", "&", "'", '"'): if bad in url: return False if not url.lower().startswith("https://"): return False # Reject embedded userinfo (https://user:pass@host/...) to prevent credential leakage. # We look at the netloc — anything between https:// and the first '/'. rest = url[len("https://"):] netloc_end = rest.find("/") netloc = rest if netloc_end == -1 else rest[:netloc_end] if "@" in netloc: return False # Hostname sanity: no empty host, no whitespace already blocked above if not netloc or netloc.startswith(":") or netloc.endswith(":"): return False return True async def _download_custom_git_template(url_with_branch: str) -> Tuple[bool, str, str]: """Clone a custom git repo and stage its static site under WEBSITE_ROOT. Returns (ok, tpl_id, message_key_or_path). """ # Parse @branch suffix (only when the branch appears on the last path segment) branch = None url = url_with_branch if "@" in url_with_branch.rsplit("/", 1)[-1]: base, _, maybe_branch = url_with_branch.rpartition("@") if ( base.lower().startswith("https://") and maybe_branch # reject empty branch after `@` and "/" not in maybe_branch and re.match(r'^[A-Za-z0-9._/\-]+$', maybe_branch) ): url = base branch = maybe_branch elif not maybe_branch and base.lower().startswith("https://"): # Trailing `@` with no branch — drop it so git doesn't treat it as userinfo url = base tpl_id = "custom_" + hashlib.md5(url_with_branch.encode("utf-8")).hexdigest()[:10] target_dir = f"/opt/gotelegram/custom_templates/{tpl_id}" # Clean previous copy if os.path.isdir(target_dir): shutil.rmtree(target_dir, ignore_errors=True) tmp_dir = f"/tmp/{tpl_id}_clone" if os.path.isdir(tmp_dir): shutil.rmtree(tmp_dir, ignore_errors=True) cmd = ["git", "clone", "--depth", "1"] if branch: cmd += ["--branch", branch] cmd += [url, tmp_dir] try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: _, err = await asyncio.wait_for(proc.communicate(), timeout=_CUSTOM_GIT_CLONE_TIMEOUT) except asyncio.TimeoutError: try: proc.kill() except ProcessLookupError: pass return False, tpl_id, "cg_timeout" if proc.returncode != 0: return False, tpl_id, "cg_invalid" except Exception as e: logger.warning("custom git clone failed: %s", e) return False, tpl_id, "cg_invalid" # Remove .git to enforce size guard and avoid leaking repo history git_dir = os.path.join(tmp_dir, ".git") if os.path.isdir(git_dir): shutil.rmtree(git_dir, ignore_errors=True) # Size guard total = 0 for root, _dirs, files in os.walk(tmp_dir): for f in files: try: total += os.path.getsize(os.path.join(root, f)) except OSError: pass if total > _CUSTOM_GIT_MAX_MB * 1024 * 1024: shutil.rmtree(tmp_dir, ignore_errors=True) return False, tpl_id, "cg_too_big" # Locate index.html in priority order found_root = None for sub in ("", "dist", "public", "build", "_site", "site", "docs", "out", "www"): cand = os.path.join(tmp_dir, sub) if sub else tmp_dir if os.path.isfile(os.path.join(cand, "index.html")): found_root = cand break if not found_root: # Fallback: search maxdepth 4 for root, _dirs, files in os.walk(tmp_dir): depth = root[len(tmp_dir):].count(os.sep) if depth > 4: continue if "index.html" in files: found_root = root break if not found_root: shutil.rmtree(tmp_dir, ignore_errors=True) return False, tpl_id, "cg_no_index" # Stage final template dir os.makedirs(os.path.dirname(target_dir), exist_ok=True) shutil.copytree(found_root, target_dir) try: with open(os.path.join(target_dir, ".custom_git_source"), "w", encoding="utf-8") as f: f.write(url_with_branch + "\n") except OSError: pass shutil.rmtree(tmp_dir, ignore_errors=True) return True, tpl_id, target_dir async def cb_pro_category(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show templates in category.""" query = update.callback_query data = query.data cat_id = data.removeprefix("pro_cat_") await query.answer() catalog = load_json(TEMPLATES_CATALOG) if not catalog: await safe_edit_message(query,"❌ Template catalog not found") return # Find category and templates category = None templates = [] for cat in catalog.get("categories", []): if cat["id"] == cat_id: category = cat templates = cat.get("templates", []) break if not category: await safe_edit_message(query,"❌ Category not found") return buttons = [] for tpl in templates: buttons.append( [ InlineKeyboardButton( f"🎨 {tpl['name']}", callback_data=f"pro_tpl_{tpl['id']}" ) ] ) buttons.append([InlineKeyboardButton("« Back", callback_data="install_mode_pro")]) text = f"Select template from {html.escape(category['name'])}:" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_pro_template(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show template preview and confirm.""" query = update.callback_query data = query.data tpl_id = data.removeprefix("pro_tpl_") await query.answer() catalog = load_json(TEMPLATES_CATALOG) if not catalog: await safe_edit_message(query,"❌ Template catalog not found") return # Find template template = None for cat in catalog.get("categories", []): for tpl in cat.get("templates", []): if tpl["id"] == tpl_id: template = tpl break if template: break if not template: await safe_edit_message(query,"❌ Template not found") return tpl_name = html.escape(template.get('name', 'Unknown')) tpl_desc = html.escape(template.get('description', 'N/A')) text = ( f"🎨 Template Preview\n\n" f"Name: {tpl_name}\n" f"Description: {tpl_desc}\n\n" ) if "preview_url" in template: preview_url = html.escape(template['preview_url'], quote=True) text += f'View Live Preview\n\n' text += "Confirm installation?" buttons = [ [ InlineKeyboardButton( "✅ Install", callback_data=f"pro_confirm_{tpl_id}" ) ], [InlineKeyboardButton("« Back", callback_data="install_mode_pro")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_pro_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Confirm Pro template selection — real implementation (v2.4.2+). Branches on current mode: * pro mode (active deployment): invoke `install.sh --action=change-template` which downloads the new template and redeploys it to nginx. Reuses the existing domain + SSL cert. * any other mode (or no install at all): route to CLI. Fresh Pro install still requires interactive flow (domain, email, DNS check) — not safe to run headless from the bot. Historic context: v2.4.1 stub used to overwrite config.json with a fake blob; that was replaced with a safe message in v2.4.1 hotfix; now in v2.4.2 we wire the real change-template path through install.sh. """ query = update.callback_query data = query.data tpl_id = data.removeprefix("pro_confirm_") await query.answer() # Defense-in-depth: even though subprocess.exec uses list args (no shell), # we still enforce the catalog id shape before handing it to install.sh. if not _TPL_ID_RE.match(tpl_id): logger.warning(f"cb_pro_confirm: rejecting malformed tpl_id {tpl_id!r}") await safe_edit_message( query, "❌ Некорректный идентификатор шаблона\n\n" "Выбран неподдерживаемый шаблон. Вернитесь в меню и попробуйте снова.", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ), parse_mode="HTML", ) return # Read current config to decide: in-place change-template vs fresh install config = load_json(GOTELEGRAM_CONFIG) or {} current_mode = config.get("mode", "") if current_mode != "pro": # Fresh install / mode switch — still routes to CLI (needs domain, SSL) text = ( "⚠️ Установка Pro из бота пока не поддерживается\n\n" f"Выбранный шаблон: {html.escape(tpl_id)}\n\n" "Pro-режим требует ввода домена, email и проверки DNS. " "Чтобы установить Pro, запустите на сервере:\n" "gotelegram1) Прокси → 1) Установить/Обновить → Pro\n\n" "Существующая конфигурация не была изменена." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML") return # Pro mode is active — perform change-template in place if _BOT_ACTION_LOCK.locked(): await safe_edit_message( query, "⏳ Другая операция уже выполняется\n\n" "Дождитесь завершения предыдущей смены шаблона/домена и повторите.", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ), parse_mode="HTML", ) return progress_text = ( "⏳ Меняю шаблон сайта...\n\n" f"Шаблон: {html.escape(tpl_id)}\n\n" "Скачиваю репозиторий и разворачиваю в nginx. " "Это может занять 30–90 секунд." ) await safe_edit_message(query, progress_text, parse_mode="HTML") # Template download + git clone can be slow — generous timeout. # Mutex serializes with any concurrent change-lite-domain/change-template. async with _BOT_ACTION_LOCK: result = await run_bot_action("change-template", timeout=180, template=tpl_id) if result.get("status") == "success": domain = result.get("domain", config.get("domain", "")) text = ( "✅ Шаблон обновлён\n\n" f"Новый шаблон: {html.escape(tpl_id)}\n" f"Сайт: https://{html.escape(domain)}\n\n" "Прокси продолжает работать без перерыва." ) else: err_msg = result.get("message", "unknown error") err_code = result.get("code", "") text = ( "❌ Не удалось сменить шаблон\n\n" f"Шаблон: {html.escape(tpl_id)}\n" f"Причина: {html.escape(err_msg)}" + (f" ({html.escape(err_code)})" if err_code else "") + "\n\n" "Существующая конфигурация не была изменена. " "Попробуйте другой шаблон или запустите gotelegram из консоли." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML", disable_web_page_preview=True) # ============================================================================ # PROXY LINK & SHARE # ============================================================================ async def get_proxy_link() -> Optional[str]: """Generate proxy link from config. Pro-mode uses domain + fake-TLS secret.""" config = load_json(GOTELEGRAM_CONFIG) if not config: return None # Get secret from telemt TOML config (v3 format: [access.users] main = "...") secret = config.get("secret", "") if not secret: telemt_cfg = load_toml(TELEMT_CONFIG) if telemt_cfg: access = telemt_cfg.get("access", {}) users = access.get("users", {}) if isinstance(users, dict): secret = users.get("main", "") if not secret: return None mode = config.get("mode", "lite") domain = config.get("domain", "") port = config.get("port", 443) # Pro-режим: ссылка с доменом и fake-TLS секретом (ee + secret + hex domain) if mode == "pro" and domain: domain_hex = domain.encode().hex() faketls_secret = f"ee{secret}{domain_hex}" return f"tg://proxy?server={domain}&port={port}&secret={faketls_secret}" # Lite-режим: IP + fake-TLS с mask_host code, stdout, _ = await sh("curl", "-s", "-4", "--max-time", "5", "https://api.ipify.org") server = stdout.strip() if code == 0 and stdout.strip() else "0.0.0.0" mask_host = config.get("mask_host", "") if mask_host: domain_hex = mask_host.encode().hex() faketls_secret = f"ee{secret}{domain_hex}" return f"tg://proxy?server={server}&port={port}&secret={faketls_secret}" return f"tg://proxy?server={server}&port={port}&secret={secret}" async def cb_menu_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Generate and show proxy link.""" query = update.callback_query await query.answer() link = await get_proxy_link() if not link: text = "❌ Proxy not installed yet. Run install first." else: text = ( f"🔗 Proxy Link\n\n" f"{html.escape(link)}\n\n" f"Open in Telegram to connect." ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_menu_share(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Share link as QR code.""" query = update.callback_query await query.answer() link = await get_proxy_link() if not link: text = "❌ Proxy not installed yet." keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard) return # Try to generate QR code qr_file = None code, _, _ = await sh("which", "qrencode") if code == 0: qr_path = "/tmp/proxy_qr.png" code, _, _ = await sh("qrencode", "-o", qr_path, link) if code == 0 and os.path.exists(qr_path): qr_file = qr_path if qr_file: try: with open(qr_file, "rb") as f: await query.message.reply_photo( photo=f, caption=f"📤 Proxy QR Code\n\n{html.escape(link)}", parse_mode="HTML", ) except Exception as e: logger.error(f"Failed to send QR code: {e}") await safe_edit_message(query, f"🔗 Proxy Link\n\n{html.escape(link)}", parse_mode="HTML", ) finally: try: os.remove(qr_file) except OSError: pass else: await safe_edit_message(query, f"🔗 Proxy Link\n\n{html.escape(link)}", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ), parse_mode="HTML", ) # ============================================================================ # RESTART & LOGS # ============================================================================ async def cb_menu_restart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Restart service.""" query = update.callback_query await query.answer() text = "⏳ Restarting telemt service..." await safe_edit_message(query,text) code, _, stderr = await sh("systemctl", "restart", TELEMT_SERVICE) if code == 0: text = "✅ Service restarted successfully" else: text = f"❌ Failed to restart:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML" ) async def cb_menu_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show recent logs.""" query = update.callback_query await query.answer() code, stdout, _ = await sh( "journalctl", "-u", TELEMT_SERVICE, "-n", "30", "--no-pager" ) if code == 0: log_text = stdout[-1000:] if len(stdout) > 1000 else stdout text = f"📋 Recent Logs\n\n
{html.escape(log_text)}
" else: text = "❌ Failed to retrieve logs" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # BACKUP & RESTORE # ============================================================================ async def cb_menu_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Backup menu.""" query = update.callback_query await query.answer() # List existing backups backups = [] try: if os.path.exists(BACKUP_DIR): backups = sorted( [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")], reverse=True, ) except Exception: pass buttons = [[InlineKeyboardButton("💾 Create Backup", callback_data="backup_create")]] if backups: buttons.append( [InlineKeyboardButton("📋 List Backups", callback_data="backup_list")] ) buttons.append([InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]) text = f"💾 Backup Management\n\nExisting backups: {len(backups)}" keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_backup_create(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Create backup.""" query = update.callback_query await query.answer() await safe_edit_message(query,"⏳ Creating backup...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = os.path.join(BACKUP_DIR, f"backup_{timestamp}.tar.gz") os.makedirs(BACKUP_DIR, exist_ok=True) code, _, stderr = await sh( "tar", "-czf", backup_file, GOTELEGRAM_CONFIG, TELEMT_CONFIG ) if code == 0: text = f"✅ Backup created:\n{html.escape(backup_file)}" else: text = f"❌ Backup failed:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("« Back", callback_data="menu_backup")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_backup_list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """List backups.""" query = update.callback_query await query.answer() backups = [] try: if os.path.exists(BACKUP_DIR): backups = sorted( [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")], reverse=True, ) except Exception: pass if not backups: text = "No backups found" else: text = "📋 Available Backups\n\n" for backup in backups[:10]: path = os.path.join(BACKUP_DIR, backup) size = os.path.getsize(path) / (1024 * 1024) text += f"{html.escape(backup)} ({size:.2f} MB)\n" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("« Back", callback_data="menu_backup")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_menu_restore(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Restore menu.""" query = update.callback_query await query.answer() backups = [] try: if os.path.exists(BACKUP_DIR): backups = sorted( [f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")], reverse=True, ) except Exception: pass if not backups: text = "❌ No backups available" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) else: text = "Select backup to restore:" buttons = [] for i, backup in enumerate(backups[:10]): buttons.append( [ InlineKeyboardButton( backup, callback_data=f"restore_idx_{i}" ) ] ) buttons.append([InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]) keyboard = InlineKeyboardMarkup(buttons) # Store backup list in user_data for retrieval context.user_data["backup_list"] = backups[:10] await safe_edit_message(query,text, reply_markup=keyboard) async def cb_restore_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Execute backup restoration.""" query = update.callback_query data = query.data try: idx = int(data.removeprefix("restore_idx_")) except ValueError: await query.answer("Invalid backup selection") return backup_list = context.user_data.get("backup_list", []) if idx < 0 or idx >= len(backup_list): await query.answer("Backup not found") return backup_name = backup_list[idx] backup_path = os.path.join(BACKUP_DIR, backup_name) await query.answer() await safe_edit_message(query,f"⏳ Restoring from {html.escape(backup_name)}...") if not os.path.exists(backup_path): text = "❌ Backup file not found" else: # Simple restore: extract tar to overwrite configs code, _, stderr = await sh( "tar", "-xzf", backup_path, "-C", "/", timeout=60 ) if code == 0: # Restart services await sh("systemctl", "restart", TELEMT_SERVICE) text = f"✅ Restored from {html.escape(backup_name)}" else: text = f"❌ Restore failed:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # UPDATE & MODE/TEMPLATE CHANGE # ============================================================================ async def cb_menu_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Update telemt by re-running the install script's update logic.""" query = update.callback_query await query.answer() await safe_edit_message(query,"⏳ Checking for telemt updates...") # Get current version cur_code, cur_out, _ = await sh("telemt", "--version") current = cur_out.strip() if cur_code == 0 else "unknown" # Check latest release from GitHub code, stdout, stderr = await sh( "curl", "-s", "--max-time", "10", "https://api.github.com/repos/telemt/telemt/releases/latest", ) if code != 0 or not stdout.strip(): text = "❌ Failed to check for updates" else: try: release = json.loads(stdout) latest = release.get("tag_name", "unknown") if latest == current: text = f"✅ telemt is already up to date ({html.escape(current)})" else: text = ( f"ℹ️ Update available: {html.escape(current)} → {html.escape(latest)}\n\n" f"Run the CLI installer to update:\n" f"sudo bash install.sh → menu item 10" ) except json.JSONDecodeError: text = "❌ Failed to parse release info" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_menu_change(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Change mode or template.""" query = update.callback_query await query.answer() buttons = [ [InlineKeyboardButton("⚡ Switch to Lite Mode", callback_data="change_lite")], [InlineKeyboardButton("🛡 Switch to Pro Mode", callback_data="change_pro")], [InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query, "Change mode or template:", reply_markup=keyboard ) async def cb_change_lite(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Switch to lite mode — show domain selection.""" query = update.callback_query await query.answer() # Reuse the lite mode domain selection flow await cb_install_mode_lite(update, context) async def cb_change_pro(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Switch to pro mode — show template categories.""" query = update.callback_query await query.answer() # Reuse the pro mode template selection flow await cb_install_mode_pro(update, context) async def cb_install_migrate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Migrate from v1 (mtg Docker) to v2 (telemt).""" query = update.callback_query await query.answer() await safe_edit_message(query,"⏳ Migrating from v1...") # Stop old mtg container code, _, stderr = await sh("docker", "stop", "mtproto-proxy", timeout=30) if code != 0: code, _, stderr = await sh("docker", "stop", "mtg", timeout=30) # Remove old container await sh("docker", "rm", "mtproto-proxy", timeout=15) await sh("docker", "rm", "mtg", timeout=15) text = ( "✅ v1 container stopped and removed\n\n" "Now select installation mode for v2:" ) keyboard = get_install_mode_menu() await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # WEBSITE & SSL # ============================================================================ async def cb_menu_website(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Website and SSL management.""" query = update.callback_query await query.answer() buttons = [ [InlineKeyboardButton("🔄 Renew SSL Certificate", callback_data="ssl_renew")], [InlineKeyboardButton("📊 SSL Status", callback_data="ssl_status")], [InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query, "Website & SSL Management:", reply_markup=keyboard ) async def cb_ssl_renew(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Renew SSL certificate.""" query = update.callback_query await query.answer() await safe_edit_message(query,"⏳ Renewing SSL certificate...") code, stdout, stderr = await sh("certbot", "renew", timeout=120) if code == 0: text = "✅ SSL certificate renewed successfully" else: text = f"❌ Renewal failed:\n{html.escape(stderr[:500])}" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("« Back", callback_data="menu_website")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_ssl_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Show SSL status.""" query = update.callback_query await query.answer() code, stdout, _ = await sh("certbot", "certificates") if code == 0: text = f"📊 SSL Certificates\n\n
{html.escape(stdout[:1000])}
" else: text = "❌ Failed to get SSL status" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton("« Back", callback_data="menu_website")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # ADMIN MANAGEMENT # ============================================================================ async def cb_menu_admins(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Показать список админов и кнопки управления.""" query = update.callback_query await query.answer() if ALLOWED_IDS: ids_list = "\n".join(f" • {uid}" for uid in sorted(ALLOWED_IDS)) text = f"👤 Администраторы\n\n{ids_list}\n" else: text = "👤 Администраторы\n\nСписок пуст — доступ для всех\n" text += ( f"\nВсего: {len(ALLOWED_IDS)}\n\n" "Чтобы добавить — перешлите любое сообщение от нового админа, " "или отправьте команду:\n" "/addadmin 123456789\n\n" "Чтобы удалить:\n" "/deladmin 123456789" ) keyboard = InlineKeyboardMarkup([ [InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")], ]) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML") async def cmd_addadmin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """/addadmin ID [ID2 ID3 ...] — добавить админа вручную.""" if not is_user_allowed(update.effective_user.id): await update.message.reply_text( f"⛔ Доступ запрещён.\nВаш ID: {update.effective_user.id}", parse_mode="HTML", ) return args = context.args or [] if not args: await update.message.reply_text( "Использование: /addadmin ID [ID2 ID3 ...]\n" "Пример: /addadmin 123456789 987654321", parse_mode="HTML", ) return added = [] errors = [] for a in args: a = a.strip().replace(",", "") if not a: continue try: uid = int(a) add_admin(uid) added.append(str(uid)) except ValueError: errors.append(a) parts = [] if added: parts.append(f"✅ Добавлены: {', '.join(added)}") if errors: parts.append(f"❌ Ошибки: {', '.join(errors)}") await update.message.reply_text("\n".join(parts), parse_mode="HTML") async def cmd_deladmin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """/deladmin ID — удалить админа.""" if not is_user_allowed(update.effective_user.id): await update.message.reply_text( f"⛔ Доступ запрещён.\nВаш ID: {update.effective_user.id}", parse_mode="HTML", ) return args = context.args or [] if not args: await update.message.reply_text( "Использование: /deladmin ID", parse_mode="HTML", ) return removed = [] for a in args: a = a.strip().replace(",", "") try: uid = int(a) if uid == update.effective_user.id: await update.message.reply_text("⚠️ Нельзя удалить себя!") continue if uid in ALLOWED_IDS: remove_admin(uid) removed.append(str(uid)) else: await update.message.reply_text(f"ID {uid} не найден в списке") except ValueError: await update.message.reply_text(f"❌ Некорректный ID: {html.escape(a)}") if removed: await update.message.reply_text(f"✅ Удалены: {', '.join(removed)}") # ============================================================================ # PROMO & CREDITS # ============================================================================ def get_promo_text() -> str: """Return promo text with 2 hosters + donate.""" return ( "💰 Хостинг #1 — скидка до 60%\n" f"{PROMO_LINK_1}\n\n" "Промокоды:\n" " OFF60 — 60% на первый месяц\n" " antenka20 — 20% + 3% за 3 мес\n" " antenka6 — 15% + 5% за 6 мес\n\n" "━━━━━━━━━━━━━━━━━━━━━━━━━\n\n" "💰 Хостинг #2 — скидка до 60%\n" f"{PROMO_LINK_2}\n\n" "Промокод:\n" " OFF60 — 60% на первый месяц\n\n" "━━━━━━━━━━━━━━━━━━━━━━━━━\n\n" "☕ Донат / Чаевые\n" f"{TIP_LINK}" ) def should_show_promo_bot() -> bool: """Check if promo should be shown (once per 24h).""" try: if not os.path.exists(PROMO_STAMP_FILE): return True with open(PROMO_STAMP_FILE, "r") as f: last_ts = int(f.read().strip()) return (int(time.time()) - last_ts) >= 86400 except (ValueError, OSError): return True def mark_promo_shown_bot() -> None: """Mark promo as shown.""" try: os.makedirs(os.path.dirname(PROMO_STAMP_FILE), exist_ok=True) with open(PROMO_STAMP_FILE, "w") as f: f.write(str(int(time.time()))) except OSError: pass async def cb_menu_promo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Promo information — shown as a separate ephemeral message that auto-deletes after 30s so it does not clutter the chat. The main menu message stays intact (we don't edit it in place).""" query = update.callback_query await query.answer() promo_msg = await query.message.reply_text( get_promo_text(), parse_mode="HTML", disable_web_page_preview=True ) asyncio.create_task(_delete_message_after(promo_msg, 30)) async def cb_menu_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Credits and acknowledgements.""" query = update.callback_query await query.answer() text = ( f"ℹ️ Credits & Acknowledgements\n\n" f"GoTelegram v{GOTELEGRAM_VERSION}\n\n" f"Built with love for the Telegram community\n\n" f"Special thanks to:\n\n" f"🙏 telemt - MTProxy engine\n" f" High-performance proxy core\n\n" f"🎨 HTML5UP - Beautiful web templates\n" f" Responsive design & themes\n\n" f"📚 Learning Zone - Educational resources\n" f" Community learning support\n\n" f"🚀 Start Bootstrap - Bootstrap templates\n" f" Professional design framework\n\n" f"💬 Community - Your feedback & support\n\n" f"GoTelegram is open-source and community-driven" ) keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # REMOVE # ============================================================================ async def cb_menu_remove(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Remove installation.""" query = update.callback_query await query.answer() text = ( "⚠️ Remove GoTelegram\n\n" "This will completely remove the installation.\n" "Are you sure?" ) buttons = [ [InlineKeyboardButton("❌ Yes, Remove", callback_data="remove_confirm")], [InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")], ] keyboard = InlineKeyboardMarkup(buttons) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") async def cb_remove_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Confirm removal.""" query = update.callback_query await query.answer() await safe_edit_message(query,"⏳ Removing GoTelegram...") # Stop service await sh("systemctl", "stop", TELEMT_SERVICE) # Remove directories for path in ["/opt/gotelegram", WEBSITE_ROOT]: await sh("rm", "-rf", path) text = "✅ GoTelegram removed successfully" keyboard = InlineKeyboardMarkup( [[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]] ) await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML") # ============================================================================ # CALLBACK ROUTING # ============================================================================ async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Route all callbacks.""" query = update.callback_query data = query.data # ── Авто-регистрация админа (до проверки доступа!) ── if data.startswith("admin_confirm_"): await query.answer() try: new_admin_id = int(data.split("_")[-1]) except (ValueError, IndexError): await safe_edit_message(query, "❌ Ошибка: некорректный ID") return # Безопасность: только тот кто нажал кнопку может стать админом if update.effective_user.id != new_admin_id: await query.answer("Эта кнопка не для вас", show_alert=True) return # Race condition: если кто-то уже стал админом if not _WAITING_FOR_ADMIN: await safe_edit_message(query, "ℹ️ Администратор уже назначен.") return add_admin(new_admin_id) await safe_edit_message( query, f"✅ Вы назначены администратором!\n\n" f"ID: {new_admin_id}\n\n" f"Нажмите /start чтобы открыть меню.", parse_mode="HTML", ) return if data == "admin_cancel": await query.answer() await safe_edit_message( query, "👋 Ок. Напишите /start когда будете готовы.", ) return # Access control if not is_user_allowed(update.effective_user.id): await query.answer("Доступ запрещён") return user_id = update.effective_user.id # Main menu if data == "menu_main": await query.answer() buttons = get_main_menu(user_id) text = ( f"{_tf(user_id, 'welcome_title', GOTELEGRAM_VERSION)}\n\n" f"{_t(user_id, 'welcome_subtitle')}\n" f"{_t(user_id, 'welcome_prompt')}" ) await safe_edit_message(query, text, reply_markup=buttons, parse_mode="HTML") return if data == "close_menu": await query.answer() await query.delete_message() return # Language picker if data == "menu_lang": await query.answer() current = get_user_lang(user_id) title = _t(user_id, "lang_title") curr_line = _tf(user_id, "lang_current", get_language_name(current)) prompt = _t(user_id, "lang_choose") text = f"{title}\n\n{curr_line}\n\n{prompt}" keyboard = InlineKeyboardMarkup([ [ InlineKeyboardButton("🇬🇧 English", callback_data="lang_set_en"), InlineKeyboardButton("🇷🇺 Русский", callback_data="lang_set_ru"), ], [InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_main")], ]) await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML") return if data.startswith("lang_set_"): code = data.replace("lang_set_", "", 1) if code in SUPPORTED_LANGS: set_user_lang(user_id, code) await query.answer(_tf(user_id, "lang_saved", get_language_name(code))) # Re-render main menu in the new language buttons = get_main_menu(user_id) text = ( f"{_tf(user_id, 'welcome_title', GOTELEGRAM_VERSION)}\n\n" f"{_t(user_id, 'welcome_subtitle')}\n" f"{_t(user_id, 'welcome_prompt')}" ) await safe_edit_message(query, text, reply_markup=buttons, parse_mode="HTML") else: await query.answer("Unsupported language") return # Dispatch to handlers handlers = { "menu_install": cb_menu_install, "menu_status": cb_menu_status, "menu_link": cb_menu_link, "menu_share": cb_menu_share, "menu_restart": cb_menu_restart, "menu_logs": cb_menu_logs, "menu_backup": cb_menu_backup, "menu_restore": cb_menu_restore, "menu_update": cb_menu_update, "menu_change": cb_menu_change, "menu_website": cb_menu_website, "menu_promo": cb_menu_promo, "menu_credits": cb_menu_credits, "menu_admins": cb_menu_admins, "menu_remove": cb_menu_remove, "install_mode_lite": cb_install_mode_lite, "install_mode_pro": cb_install_mode_pro, "backup_create": cb_backup_create, "backup_list": cb_backup_list, "ssl_renew": cb_ssl_renew, "ssl_status": cb_ssl_status, "remove_confirm": cb_remove_confirm, "change_lite": cb_change_lite, "change_pro": cb_change_pro, "install_migrate": cb_install_migrate, "menu_stats": cb_menu_stats, } # Custom git template URL prompt if data == "pro_custom_git": await query.answer() _CUSTOM_GIT_WAITERS[user_id] = True title = _t(user_id, "cg_title") body = _t(user_id, "cg_ask_url") await safe_edit_message( query, f"{title}\n\n{body}", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton(_t(user_id, "btn_cancel"), callback_data="menu_main")]] ), parse_mode="HTML", ) return # Pattern-based handlers if data.startswith("lite_dom_"): await cb_lite_domain(update, context) elif data.startswith("pro_cat_"): await cb_pro_category(update, context) elif data.startswith("pro_tpl_"): await cb_pro_template(update, context) elif data.startswith("pro_confirm_"): await cb_pro_confirm(update, context) elif data.startswith("restore_idx_"): await cb_restore_backup(update, context) elif data in handlers: await handlers[data](update, context) else: await query.answer("Unknown action") # ============================================================================ # ERROR HANDLERS # ============================================================================ async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle free-text input. Currently used for custom git template URLs.""" if update.message is None or update.message.text is None: return if not is_user_allowed(update.effective_user.id): return user_id = update.effective_user.id # Only act when we're explicitly waiting for a custom-git URL if not _CUSTOM_GIT_WAITERS.pop(user_id, False): return url = update.message.text.strip() if not _validate_custom_git_url(url): await update.message.reply_text(_t(user_id, "cg_invalid"), parse_mode="HTML") return await update.message.reply_text(_tf(user_id, "cg_cloning", html.escape(url)), parse_mode="HTML") ok, tpl_id, info = await _download_custom_git_template(url) if not ok: await update.message.reply_text(_t(user_id, info), parse_mode="HTML") return # Success — record in GoTelegram config. Use "template_id" (canonical # field name written by install.sh/save_gotelegram_config). config = load_json(GOTELEGRAM_CONFIG) or {} config["template_id"] = tpl_id config["template_source"] = url save_json(GOTELEGRAM_CONFIG, config) await update.message.reply_text( _tf(user_id, "cg_ok_fmt", html.escape(tpl_id)), reply_markup=get_main_menu(user_id), parse_mode="HTML", ) async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Log errors caused by Updates.""" logger.error(f"Exception while handling an update:", exc_info=context.error) # ============================================================================ # MAIN APPLICATION # ============================================================================ def main() -> None: """Start the bot.""" if not BOT_TOKEN: logger.error("BOT_TOKEN not set in .env file") return # Create the Application application = Application.builder().token(BOT_TOKEN).build() # Command handlers application.add_handler(CommandHandler("start", cmd_start)) application.add_handler(CommandHandler("help", cmd_help)) application.add_handler(CommandHandler("status", cmd_status)) application.add_handler(CommandHandler("logs", cmd_logs)) application.add_handler(CommandHandler("lang", cmd_lang)) application.add_handler(CommandHandler("addadmin", cmd_addadmin)) application.add_handler(CommandHandler("deladmin", cmd_deladmin)) # Callback query handler (buttons) application.add_handler(CallbackQueryHandler(handle_callback)) # Text message handler (for custom git URL input) application.add_handler(MessageHandler( filters.TEXT & ~filters.COMMAND, handle_text_message )) # Error handler application.add_error_handler(error_handler) # Run the bot logger.info(f"GoTelegram v{GOTELEGRAM_VERSION} bot starting...") application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": main()