mirror of
https://github.com/anten-ka/gotelegram_pro.git
synced 2026-05-19 15:36:03 +00:00
1398 lines
47 KiB
Python
1398 lines
47 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
GoTelegram v2.2 Bot - MTProxy Management for Linux
|
||
Manages telemt engine via Telegram interface with full CLI feature parity
|
||
Uses python-telegram-bot v21+
|
||
"""
|
||
|
||
import asyncio
|
||
import html
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import toml
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Tuple, Optional, List, Dict, Any
|
||
|
||
from dotenv import load_dotenv
|
||
from telegram import (
|
||
Update,
|
||
InlineKeyboardButton,
|
||
InlineKeyboardMarkup,
|
||
InputFile,
|
||
)
|
||
from telegram.ext import (
|
||
Application,
|
||
CommandHandler,
|
||
CallbackQueryHandler,
|
||
ContextTypes,
|
||
filters,
|
||
)
|
||
from telegram.error import TelegramError, BadRequest
|
||
|
||
# Load environment variables
|
||
load_dotenv()
|
||
|
||
# Logging configuration
|
||
logging.basicConfig(
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
level=logging.INFO,
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ============================================================================
|
||
# CONFIGURATION
|
||
# ============================================================================
|
||
|
||
GOTELEGRAM_VERSION = "2.2.0"
|
||
GOTELEGRAM_CONFIG = "/opt/gotelegram/config.json"
|
||
TELEMT_CONFIG = "/etc/telemt/config.toml"
|
||
TELEMT_SERVICE = "telemt"
|
||
WEBSITE_ROOT = "/var/www/gotelegram-site"
|
||
BACKUP_DIR = "/opt/gotelegram/backups"
|
||
TEMPLATES_CATALOG = "/opt/gotelegram/templates_catalog.json"
|
||
|
||
PROMO_LINK = "https://vk.cc/ct29NQ"
|
||
TIP_LINK = "https://pay.cloudtips.ru/p/7410814f"
|
||
|
||
BOT_TOKEN = os.getenv("BOT_TOKEN")
|
||
ALLOWED_IDS_STR = os.getenv("ALLOWED_IDS", "")
|
||
ALLOWED_IDS: set = set()
|
||
for _id_str in ALLOWED_IDS_STR.split(","):
|
||
_id_str = _id_str.strip()
|
||
if _id_str:
|
||
try:
|
||
ALLOWED_IDS.add(int(_id_str))
|
||
except ValueError:
|
||
logging.warning(f"Invalid ALLOWED_IDS entry: {_id_str}")
|
||
|
||
QUICK_DOMAINS = [
|
||
"google.com",
|
||
"microsoft.com",
|
||
"cloudflare.com",
|
||
"apple.com",
|
||
"amazon.com",
|
||
"github.com",
|
||
"stackoverflow.com",
|
||
"medium.com",
|
||
"wikipedia.org",
|
||
"coursera.org",
|
||
"udemy.com",
|
||
"habr.com",
|
||
"stepik.org",
|
||
"duolingo.com",
|
||
"khanacademy.org",
|
||
"bbc.com",
|
||
"reuters.com",
|
||
"nytimes.com",
|
||
"ted.com",
|
||
"zoom.us",
|
||
]
|
||
|
||
# ============================================================================
|
||
# UTILITY FUNCTIONS
|
||
# ============================================================================
|
||
|
||
|
||
async def sh(*args, timeout: int = 60) -> Tuple[int, str, str]:
|
||
"""Execute shell command asynchronously.
|
||
|
||
Args:
|
||
*args: Command and arguments
|
||
timeout: Timeout in seconds
|
||
|
||
Returns:
|
||
Tuple of (return_code, stdout, stderr)
|
||
"""
|
||
try:
|
||
process = await asyncio.create_subprocess_exec(
|
||
*args,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(), timeout=timeout
|
||
)
|
||
return (
|
||
process.returncode,
|
||
stdout.decode("utf-8", errors="replace"),
|
||
stderr.decode("utf-8", errors="replace"),
|
||
)
|
||
except asyncio.TimeoutError:
|
||
try:
|
||
process.kill()
|
||
await process.wait()
|
||
except Exception:
|
||
pass
|
||
return (-1, "", f"Command timeout after {timeout}s")
|
||
except Exception as e:
|
||
return (-1, "", str(e))
|
||
|
||
|
||
def load_json(path: str) -> Optional[Dict]:
|
||
"""Load JSON file."""
|
||
try:
|
||
with open(path, "r") as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load {path}: {e}")
|
||
return None
|
||
|
||
|
||
def load_toml(path: str) -> Optional[Dict]:
|
||
"""Load TOML file."""
|
||
try:
|
||
with open(path, "r") as f:
|
||
return toml.load(f)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load {path}: {e}")
|
||
return None
|
||
|
||
|
||
def save_json(path: str, data: Dict) -> bool:
|
||
"""Save JSON file."""
|
||
try:
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to save {path}: {e}")
|
||
return False
|
||
|
||
|
||
async def safe_edit_message(query, text: str, reply_markup=None, parse_mode=None) -> bool:
|
||
"""Safely edit message, handling cases where message was deleted or not modified."""
|
||
try:
|
||
await query.edit_message_text(
|
||
text, reply_markup=reply_markup, parse_mode=parse_mode
|
||
)
|
||
return True
|
||
except BadRequest as e:
|
||
err_msg = str(e).lower()
|
||
if "message is not modified" in err_msg:
|
||
return True # No change needed, not an error
|
||
if "message to edit not found" in err_msg or "message can't be edited" in err_msg:
|
||
logger.warning(f"Cannot edit message: {e}")
|
||
return False
|
||
raise # Re-raise unexpected BadRequest
|
||
|
||
|
||
async def check_service_status(service: str) -> bool:
|
||
"""Check if systemd service is running."""
|
||
code, _, _ = await sh("systemctl", "is-active", service)
|
||
return code == 0
|
||
|
||
|
||
async def get_telemt_version() -> str:
|
||
"""Get telemt version."""
|
||
code, stdout, _ = await sh("telemt", "-v")
|
||
if code == 0:
|
||
return stdout.strip().split()[-1] if stdout else "unknown"
|
||
return "unknown"
|
||
|
||
|
||
def is_docker_running() -> bool:
|
||
"""Check if Docker daemon is running."""
|
||
try:
|
||
subprocess.run(
|
||
["docker", "ps"],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=5,
|
||
)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
async def check_old_container() -> Optional[str]:
|
||
"""Check for old mtg Docker container (v1 migration)."""
|
||
if not is_docker_running():
|
||
return None
|
||
code, stdout, _ = await sh("docker", "ps", "-a", "--format", "{{.Names}}")
|
||
if code == 0 and "mtg" in stdout:
|
||
return "mtg"
|
||
return None
|
||
|
||
|
||
# ============================================================================
|
||
# ACCESS CONTROL
|
||
# ============================================================================
|
||
|
||
|
||
def is_user_allowed(user_id: int) -> bool:
|
||
"""Check if user ID is in ALLOWED_IDS."""
|
||
if not ALLOWED_IDS:
|
||
return True
|
||
return user_id in ALLOWED_IDS
|
||
|
||
|
||
async def require_auth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
|
||
"""Check authorization and send error if not allowed."""
|
||
if not is_user_allowed(update.effective_user.id):
|
||
await update.message.reply_text(
|
||
f"Access denied. Your ID: {update.effective_user.id}"
|
||
)
|
||
logger.warning(
|
||
f"Unauthorized access attempt from user {update.effective_user.id}"
|
||
)
|
||
return False
|
||
return True
|
||
|
||
|
||
# ============================================================================
|
||
# MAIN MENU
|
||
# ============================================================================
|
||
|
||
|
||
def get_main_menu() -> InlineKeyboardMarkup:
|
||
"""Generate main menu keyboard."""
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton("⚙️ Install", callback_data="menu_install"),
|
||
InlineKeyboardButton("📊 Status", callback_data="menu_status"),
|
||
],
|
||
[
|
||
InlineKeyboardButton("🔗 Link", callback_data="menu_link"),
|
||
InlineKeyboardButton("📤 Share", callback_data="menu_share"),
|
||
],
|
||
[
|
||
InlineKeyboardButton("🔄 Restart", callback_data="menu_restart"),
|
||
InlineKeyboardButton("📋 Logs", callback_data="menu_logs"),
|
||
],
|
||
[
|
||
InlineKeyboardButton("⚡ Change Mode/Template", callback_data="menu_change"),
|
||
InlineKeyboardButton("💾 Backup", callback_data="menu_backup"),
|
||
],
|
||
[
|
||
InlineKeyboardButton("↩️ Restore", callback_data="menu_restore"),
|
||
InlineKeyboardButton("📡 Update telemt", callback_data="menu_update"),
|
||
],
|
||
[
|
||
InlineKeyboardButton("🌐 Website/SSL", callback_data="menu_website"),
|
||
InlineKeyboardButton("🎁 Promo", callback_data="menu_promo"),
|
||
],
|
||
[
|
||
InlineKeyboardButton("🗑️ Remove", callback_data="menu_remove"),
|
||
InlineKeyboardButton("ℹ️ Credits", callback_data="menu_credits"),
|
||
],
|
||
[InlineKeyboardButton("❌ Close", callback_data="close_menu")],
|
||
]
|
||
return InlineKeyboardMarkup(buttons)
|
||
|
||
|
||
# ============================================================================
|
||
# COMMANDS
|
||
# ============================================================================
|
||
|
||
|
||
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Start command - show main menu."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
|
||
welcome = (
|
||
f"<b>GoTelegram v{GOTELEGRAM_VERSION}</b>\n\n"
|
||
"🤖 MTProxy Management Bot\n"
|
||
"Powered by telemt engine\n\n"
|
||
"Select an action from the menu below:"
|
||
)
|
||
await update.message.reply_text(
|
||
welcome, reply_markup=get_main_menu(), parse_mode="HTML"
|
||
)
|
||
|
||
|
||
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Help command - show available commands."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
|
||
help_text = (
|
||
"<b>GoTelegram Bot Commands</b>\n\n"
|
||
"/start - Show main menu\n"
|
||
"/help - Show this help message\n"
|
||
"/status - Quick status check\n"
|
||
"/logs - Show recent logs\n\n"
|
||
"Use the inline menu for all other operations."
|
||
)
|
||
await update.message.reply_text(help_text, parse_mode="HTML")
|
||
|
||
|
||
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Quick status check."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
|
||
await update.message.reply_text("⏳ Checking status...", parse_mode="HTML")
|
||
status_text = await get_status_text()
|
||
await update.message.reply_text(status_text, parse_mode="HTML")
|
||
|
||
|
||
async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show recent logs."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
|
||
code, stdout, stderr = await sh(
|
||
"journalctl", "-u", TELEMT_SERVICE, "-n", "20", "--no-pager"
|
||
)
|
||
if code == 0:
|
||
log_text = stdout[-1500:] if len(stdout) > 1500 else stdout
|
||
await update.message.reply_text(
|
||
f"<pre>{html.escape(log_text)}</pre>",
|
||
parse_mode="HTML",
|
||
)
|
||
else:
|
||
await update.message.reply_text("Failed to retrieve logs")
|
||
|
||
|
||
# ============================================================================
|
||
# STATUS
|
||
# ============================================================================
|
||
|
||
|
||
async def get_status_text() -> str:
|
||
"""Generate status report."""
|
||
lines = ["<b>📊 Current Status</b>\n"]
|
||
|
||
# Service status
|
||
is_running = await check_service_status(TELEMT_SERVICE)
|
||
lines.append(f"<b>Service:</b> {'✅ Running' if is_running else '❌ Stopped'}")
|
||
|
||
# Telemt version
|
||
version = await get_telemt_version()
|
||
lines.append(f"<b>Telemt:</b> v{version}")
|
||
|
||
# Config status
|
||
config = load_json(GOTELEGRAM_CONFIG)
|
||
if config:
|
||
lines.append(f"<b>Mode:</b> {html.escape(str(config.get('mode', 'unknown')))}")
|
||
if "template" in config:
|
||
lines.append(f"<b>Template:</b> {html.escape(str(config['template']))}")
|
||
if "domain" in config:
|
||
lines.append(f"<b>Domain:</b> {html.escape(str(config['domain']))}")
|
||
if "port" in config:
|
||
lines.append(f"<b>Port:</b> {html.escape(str(config['port']))}")
|
||
|
||
# Telemt config
|
||
telemt_cfg = load_toml(TELEMT_CONFIG)
|
||
if telemt_cfg:
|
||
cfg = telemt_cfg.get("config", {})
|
||
if "listen_port" in cfg:
|
||
lines.append(f"<b>Listen Port:</b> {cfg['listen_port']}")
|
||
|
||
# Backups
|
||
backup_count = 0
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backup_count = len([f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")])
|
||
except Exception:
|
||
pass
|
||
lines.append(f"<b>Backups:</b> {backup_count}")
|
||
|
||
# Old container check
|
||
old_container = await check_old_container()
|
||
if old_container:
|
||
lines.append(f"\n⚠️ <b>Found old container:</b> {html.escape(old_container)}")
|
||
lines.append("Run 'Install' to migrate")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def cb_menu_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Status callback."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Checking status...")
|
||
|
||
status_text = await get_status_text()
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,
|
||
status_text, reply_markup=keyboard, parse_mode="HTML"
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# INSTALL
|
||
# ============================================================================
|
||
|
||
|
||
def get_install_mode_menu() -> InlineKeyboardMarkup:
|
||
"""Install mode selection menu."""
|
||
buttons = [
|
||
[InlineKeyboardButton("⚡ Quick Mode", callback_data="install_mode_quick")],
|
||
[InlineKeyboardButton("🔒 Stealth Mode", callback_data="install_mode_stealth")],
|
||
[InlineKeyboardButton("« Back", callback_data="menu_main")],
|
||
]
|
||
return InlineKeyboardMarkup(buttons)
|
||
|
||
|
||
async def cb_menu_install(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Install menu callback."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# Check for old container
|
||
old_container = await check_old_container()
|
||
if old_container:
|
||
text = (
|
||
f"⚠️ <b>Migration from v1 detected</b>\n\n"
|
||
f"Found Docker container: {html.escape(old_container)}\n\n"
|
||
f"Would you like to:\n"
|
||
f"1. Migrate from v1 (recommended)\n"
|
||
f"2. Fresh install (will remove old container)\n\n"
|
||
f"<i>Select below or choose install mode</i>"
|
||
)
|
||
buttons = [
|
||
[InlineKeyboardButton("🔄 Migrate from v1", callback_data="install_migrate")],
|
||
[InlineKeyboardButton("✨ Fresh Install", callback_data="install_mode_quick")],
|
||
[InlineKeyboardButton("« Back", callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
else:
|
||
text = "Select installation mode:"
|
||
keyboard = get_install_mode_menu()
|
||
|
||
await safe_edit_message(query,
|
||
text, reply_markup=keyboard, parse_mode="HTML"
|
||
)
|
||
|
||
|
||
async def cb_install_mode_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Quick mode domain selection."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# Show domains with pagination (4 per row, 2 rows)
|
||
buttons = []
|
||
for i in range(0, len(QUICK_DOMAINS), 2):
|
||
row = []
|
||
for j in range(2):
|
||
if i + j < len(QUICK_DOMAINS):
|
||
domain = QUICK_DOMAINS[i + j]
|
||
row.append(
|
||
InlineKeyboardButton(
|
||
domain, callback_data=f"quick_dom_{i+j}"
|
||
)
|
||
)
|
||
buttons.append(row)
|
||
|
||
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_install")])
|
||
|
||
text = "Select a domain for quick mode:"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard)
|
||
|
||
|
||
async def cb_quick_domain(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Quick domain selection callback."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
try:
|
||
domain_idx = int(data.split("_")[-1])
|
||
domain = QUICK_DOMAINS[domain_idx]
|
||
except (ValueError, IndexError):
|
||
await query.answer("Invalid domain selection")
|
||
return
|
||
|
||
await query.answer()
|
||
await safe_edit_message(query,f"⏳ Installing with domain: {domain}...")
|
||
|
||
# Simulate installation (in real scenario, call install script)
|
||
config = {
|
||
"mode": "quick",
|
||
"domain": domain,
|
||
"port": 443,
|
||
"installed_at": datetime.now().isoformat(),
|
||
}
|
||
|
||
if save_json(GOTELEGRAM_CONFIG, config):
|
||
text = (
|
||
f"✅ <b>Quick mode installed!</b>\n\n"
|
||
f"<b>Domain:</b> {domain}\n"
|
||
f"<b>Mode:</b> Quick\n\n"
|
||
f"Service starting... Check status in 10 seconds."
|
||
)
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,
|
||
text, reply_markup=keyboard, parse_mode="HTML"
|
||
)
|
||
else:
|
||
await safe_edit_message(query,
|
||
"❌ Failed to save configuration",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_install")]]
|
||
),
|
||
)
|
||
|
||
|
||
async def cb_install_mode_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Stealth mode - show template categories."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
catalog = load_json(TEMPLATES_CATALOG)
|
||
if not catalog or "categories" not in catalog:
|
||
await safe_edit_message(query,
|
||
"❌ Template catalog not found",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_install")]]
|
||
),
|
||
)
|
||
return
|
||
|
||
buttons = []
|
||
for cat in catalog.get("categories", []):
|
||
buttons.append(
|
||
[
|
||
InlineKeyboardButton(
|
||
f"📁 {cat['name']}", callback_data=f"stealth_cat_{cat['id']}"
|
||
)
|
||
]
|
||
)
|
||
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_install")])
|
||
|
||
text = "Stealth Mode - Select Template Category:"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard)
|
||
|
||
|
||
async def cb_stealth_category(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show templates in category."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
cat_id = data.removeprefix("stealth_cat_")
|
||
|
||
await query.answer()
|
||
|
||
catalog = load_json(TEMPLATES_CATALOG)
|
||
if not catalog:
|
||
await safe_edit_message(query,"❌ Template catalog not found")
|
||
return
|
||
|
||
# Find category and templates
|
||
category = None
|
||
templates = []
|
||
for cat in catalog.get("categories", []):
|
||
if cat["id"] == cat_id:
|
||
category = cat
|
||
templates = cat.get("templates", [])
|
||
break
|
||
|
||
if not category:
|
||
await safe_edit_message(query,"❌ Category not found")
|
||
return
|
||
|
||
buttons = []
|
||
for tpl in templates:
|
||
buttons.append(
|
||
[
|
||
InlineKeyboardButton(
|
||
f"🎨 {tpl['name']}", callback_data=f"stealth_tpl_{tpl['id']}"
|
||
)
|
||
]
|
||
)
|
||
buttons.append([InlineKeyboardButton("« Back", callback_data="install_mode_stealth")])
|
||
|
||
text = f"Select template from <b>{html.escape(category['name'])}</b>:"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_stealth_template(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show template preview and confirm."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
tpl_id = data.removeprefix("stealth_tpl_")
|
||
|
||
await query.answer()
|
||
|
||
catalog = load_json(TEMPLATES_CATALOG)
|
||
if not catalog:
|
||
await safe_edit_message(query,"❌ Template catalog not found")
|
||
return
|
||
|
||
# Find template
|
||
template = None
|
||
for cat in catalog.get("categories", []):
|
||
for tpl in cat.get("templates", []):
|
||
if tpl["id"] == tpl_id:
|
||
template = tpl
|
||
break
|
||
if template:
|
||
break
|
||
|
||
if not template:
|
||
await safe_edit_message(query,"❌ Template not found")
|
||
return
|
||
|
||
tpl_name = html.escape(template.get('name', 'Unknown'))
|
||
tpl_desc = html.escape(template.get('description', 'N/A'))
|
||
text = (
|
||
f"<b>🎨 Template Preview</b>\n\n"
|
||
f"<b>Name:</b> {tpl_name}\n"
|
||
f"<b>Description:</b> {tpl_desc}\n\n"
|
||
)
|
||
if "preview_url" in template:
|
||
preview_url = html.escape(template['preview_url'], quote=True)
|
||
text += f'<a href="{preview_url}">View Live Preview</a>\n\n'
|
||
|
||
text += "Confirm installation?"
|
||
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"✅ Install", callback_data=f"stealth_confirm_{tpl_id}"
|
||
)
|
||
],
|
||
[InlineKeyboardButton("« Back", callback_data="install_mode_stealth")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_stealth_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Confirm and install stealth template."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
tpl_id = data.removeprefix("stealth_confirm_")
|
||
|
||
await query.answer()
|
||
await safe_edit_message(query,"⏳ Installing template...")
|
||
|
||
config = {
|
||
"mode": "stealth",
|
||
"template": tpl_id,
|
||
"port": 443,
|
||
"installed_at": datetime.now().isoformat(),
|
||
}
|
||
|
||
if save_json(GOTELEGRAM_CONFIG, config):
|
||
text = (
|
||
f"✅ <b>Stealth mode installed!</b>\n\n"
|
||
f"<b>Template:</b> {html.escape(tpl_id)}\n"
|
||
f"<b>Mode:</b> Stealth\n\n"
|
||
f"Service starting... Check status in 10 seconds."
|
||
)
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,
|
||
text, reply_markup=keyboard, parse_mode="HTML"
|
||
)
|
||
else:
|
||
await safe_edit_message(query,
|
||
"❌ Failed to save configuration",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_install")]]
|
||
),
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# PROXY LINK & SHARE
|
||
# ============================================================================
|
||
|
||
|
||
async def get_proxy_link() -> Optional[str]:
|
||
"""Generate proxy link from config."""
|
||
config = load_json(GOTELEGRAM_CONFIG)
|
||
if not config:
|
||
return None
|
||
|
||
# Get secret from telemt TOML config
|
||
secret = config.get("secret", "")
|
||
if not secret:
|
||
telemt_cfg = load_toml(TELEMT_CONFIG)
|
||
if telemt_cfg:
|
||
users = telemt_cfg.get("users", [])
|
||
if isinstance(users, list) and users:
|
||
secret = users[0].get("secret", "")
|
||
if not secret:
|
||
return None
|
||
|
||
# Get server IP
|
||
code, stdout, _ = await sh("curl", "-s", "-4", "--max-time", "5", "https://api.ipify.org")
|
||
server = stdout.strip() if code == 0 and stdout.strip() else "0.0.0.0"
|
||
|
||
port = config.get("port", 443)
|
||
|
||
return f"tg://proxy?server={server}&port={port}&secret={secret}"
|
||
|
||
|
||
async def cb_menu_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Generate and show proxy link."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
link = await get_proxy_link()
|
||
if not link:
|
||
text = "❌ Proxy not installed yet. Run install first."
|
||
else:
|
||
text = (
|
||
f"<b>🔗 Proxy Link</b>\n\n"
|
||
f"<code>{html.escape(link)}</code>\n\n"
|
||
f"Open in Telegram to connect."
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_menu_share(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Share link as QR code."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
link = await get_proxy_link()
|
||
if not link:
|
||
text = "❌ Proxy not installed yet."
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard)
|
||
return
|
||
|
||
# Try to generate QR code
|
||
qr_file = None
|
||
code, _, _ = await sh("which", "qrencode")
|
||
if code == 0:
|
||
qr_path = "/tmp/proxy_qr.png"
|
||
code, _, _ = await sh("qrencode", "-o", qr_path, link)
|
||
if code == 0 and os.path.exists(qr_path):
|
||
qr_file = qr_path
|
||
|
||
if qr_file:
|
||
try:
|
||
with open(qr_file, "rb") as f:
|
||
await query.message.reply_photo(
|
||
photo=f,
|
||
caption=f"<b>📤 Proxy QR Code</b>\n\n{html.escape(link)}",
|
||
parse_mode="HTML",
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to send QR code: {e}")
|
||
await safe_edit_message(query,
|
||
f"<b>🔗 Proxy Link</b>\n\n<code>{html.escape(link)}</code>",
|
||
parse_mode="HTML",
|
||
)
|
||
else:
|
||
await safe_edit_message(query,
|
||
f"<b>🔗 Proxy Link</b>\n\n<code>{html.escape(link)}</code>",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
),
|
||
parse_mode="HTML",
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# RESTART & LOGS
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_restart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Restart service."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
text = "⏳ Restarting telemt service..."
|
||
await safe_edit_message(query,text)
|
||
|
||
code, _, stderr = await sh("systemctl", "restart", TELEMT_SERVICE)
|
||
if code == 0:
|
||
text = "✅ Service restarted successfully"
|
||
else:
|
||
text = f"❌ Failed to restart:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,
|
||
text, reply_markup=keyboard, parse_mode="HTML"
|
||
)
|
||
|
||
|
||
async def cb_menu_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show recent logs."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
code, stdout, _ = await sh(
|
||
"journalctl", "-u", TELEMT_SERVICE, "-n", "30", "--no-pager"
|
||
)
|
||
|
||
if code == 0:
|
||
log_text = stdout[-1000:] if len(stdout) > 1000 else stdout
|
||
text = f"<b>📋 Recent Logs</b>\n\n<pre>{html.escape(log_text)}</pre>"
|
||
else:
|
||
text = "❌ Failed to retrieve logs"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# BACKUP & RESTORE
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Backup menu."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# List existing backups
|
||
backups = []
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backups = sorted(
|
||
[f for f in os.listdir(BACKUP_DIR)
|
||
if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
|
||
reverse=True,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
buttons = [[InlineKeyboardButton("💾 Create Backup", callback_data="backup_create")]]
|
||
|
||
if backups:
|
||
buttons.append(
|
||
[InlineKeyboardButton("📋 List Backups", callback_data="backup_list")]
|
||
)
|
||
|
||
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_main")])
|
||
|
||
text = f"<b>💾 Backup Management</b>\n\nExisting backups: {len(backups)}"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_backup_create(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Create backup."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Creating backup...")
|
||
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
backup_file = os.path.join(BACKUP_DIR, f"backup_{timestamp}.tar.gz")
|
||
|
||
os.makedirs(BACKUP_DIR, exist_ok=True)
|
||
code, _, stderr = await sh(
|
||
"tar", "-czf", backup_file, GOTELEGRAM_CONFIG, TELEMT_CONFIG
|
||
)
|
||
|
||
if code == 0:
|
||
text = f"✅ Backup created:\n<code>{html.escape(backup_file)}</code>"
|
||
else:
|
||
text = f"❌ Backup failed:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_backup")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_backup_list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""List backups."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
backups = []
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backups = sorted(
|
||
[f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
|
||
reverse=True,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
if not backups:
|
||
text = "No backups found"
|
||
else:
|
||
text = "<b>📋 Available Backups</b>\n\n"
|
||
for backup in backups[:10]:
|
||
path = os.path.join(BACKUP_DIR, backup)
|
||
size = os.path.getsize(path) / (1024 * 1024)
|
||
text += f"<code>{html.escape(backup)}</code> ({size:.2f} MB)\n"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_backup")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_menu_restore(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Restore menu."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
backups = []
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backups = sorted(
|
||
[f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
|
||
reverse=True,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
if not backups:
|
||
text = "❌ No backups available"
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
else:
|
||
text = "Select backup to restore:"
|
||
buttons = []
|
||
for i, backup in enumerate(backups[:10]):
|
||
buttons.append(
|
||
[
|
||
InlineKeyboardButton(
|
||
backup, callback_data=f"restore_idx_{i}"
|
||
)
|
||
]
|
||
)
|
||
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_main")])
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
# Store backup list in user_data for retrieval
|
||
context.user_data["backup_list"] = backups[:10]
|
||
|
||
await safe_edit_message(query,text, reply_markup=keyboard)
|
||
|
||
|
||
async def cb_restore_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Execute backup restoration."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
|
||
try:
|
||
idx = int(data.removeprefix("restore_idx_"))
|
||
except ValueError:
|
||
await query.answer("Invalid backup selection")
|
||
return
|
||
|
||
backup_list = context.user_data.get("backup_list", [])
|
||
if idx < 0 or idx >= len(backup_list):
|
||
await query.answer("Backup not found")
|
||
return
|
||
|
||
backup_name = backup_list[idx]
|
||
backup_path = os.path.join(BACKUP_DIR, backup_name)
|
||
|
||
await query.answer()
|
||
await safe_edit_message(query,f"⏳ Restoring from {html.escape(backup_name)}...")
|
||
|
||
if not os.path.exists(backup_path):
|
||
text = "❌ Backup file not found"
|
||
else:
|
||
# Simple restore: extract tar to overwrite configs
|
||
code, _, stderr = await sh(
|
||
"tar", "-xzf", backup_path, "-C", "/", timeout=60
|
||
)
|
||
if code == 0:
|
||
# Restart services
|
||
await sh("systemctl", "restart", TELEMT_SERVICE)
|
||
text = f"✅ Restored from {html.escape(backup_name)}"
|
||
else:
|
||
text = f"❌ Restore failed:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# UPDATE & MODE/TEMPLATE CHANGE
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Update telemt by re-running the install script's update logic."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Checking for telemt updates...")
|
||
|
||
# Get current version
|
||
cur_code, cur_out, _ = await sh("telemt", "--version")
|
||
current = cur_out.strip() if cur_code == 0 else "unknown"
|
||
|
||
# Check latest release from GitHub
|
||
code, stdout, stderr = await sh(
|
||
"curl", "-s", "--max-time", "10",
|
||
"https://api.github.com/repos/telemt/telemt/releases/latest",
|
||
)
|
||
|
||
if code != 0 or not stdout.strip():
|
||
text = "❌ Failed to check for updates"
|
||
else:
|
||
try:
|
||
release = json.loads(stdout)
|
||
latest = release.get("tag_name", "unknown")
|
||
if latest == current:
|
||
text = f"✅ telemt is already up to date ({html.escape(current)})"
|
||
else:
|
||
text = (
|
||
f"ℹ️ Update available: {html.escape(current)} → {html.escape(latest)}\n\n"
|
||
f"Run the CLI installer to update:\n"
|
||
f"<code>sudo bash install.sh</code> → menu item 10"
|
||
)
|
||
except json.JSONDecodeError:
|
||
text = "❌ Failed to parse release info"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_menu_change(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Change mode or template."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
buttons = [
|
||
[InlineKeyboardButton("⚡ Switch to Quick Mode", callback_data="change_quick")],
|
||
[InlineKeyboardButton("🔒 Switch to Stealth Mode", callback_data="change_stealth")],
|
||
[InlineKeyboardButton("« Back", callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,
|
||
"Change mode or template:", reply_markup=keyboard
|
||
)
|
||
|
||
|
||
async def cb_change_quick(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Switch to quick mode — show domain selection."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
# Reuse the quick mode domain selection flow
|
||
await cb_install_mode_quick(update, context)
|
||
|
||
|
||
async def cb_change_stealth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Switch to stealth mode — show template categories."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
# Reuse the stealth mode template selection flow
|
||
await cb_install_mode_stealth(update, context)
|
||
|
||
|
||
async def cb_install_migrate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Migrate from v1 (mtg Docker) to v2 (telemt)."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Migrating from v1...")
|
||
|
||
# Stop old mtg container
|
||
code, _, stderr = await sh("docker", "stop", "mtproto-proxy", timeout=30)
|
||
if code != 0:
|
||
code, _, stderr = await sh("docker", "stop", "mtg", timeout=30)
|
||
|
||
# Remove old container
|
||
await sh("docker", "rm", "mtproto-proxy", timeout=15)
|
||
await sh("docker", "rm", "mtg", timeout=15)
|
||
|
||
text = (
|
||
"✅ <b>v1 container stopped and removed</b>\n\n"
|
||
"Now select installation mode for v2:"
|
||
)
|
||
keyboard = get_install_mode_menu()
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# WEBSITE & SSL
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_website(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Website and SSL management."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
buttons = [
|
||
[InlineKeyboardButton("🔄 Renew SSL Certificate", callback_data="ssl_renew")],
|
||
[InlineKeyboardButton("📊 SSL Status", callback_data="ssl_status")],
|
||
[InlineKeyboardButton("« Back", callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,
|
||
"Website & SSL Management:", reply_markup=keyboard
|
||
)
|
||
|
||
|
||
async def cb_ssl_renew(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Renew SSL certificate."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Renewing SSL certificate...")
|
||
|
||
code, stdout, stderr = await sh("certbot", "renew", timeout=120)
|
||
|
||
if code == 0:
|
||
text = "✅ SSL certificate renewed successfully"
|
||
else:
|
||
text = f"❌ Renewal failed:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_website")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_ssl_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show SSL status."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
code, stdout, _ = await sh("certbot", "certificates")
|
||
|
||
if code == 0:
|
||
text = f"<b>📊 SSL Certificates</b>\n\n<pre>{html.escape(stdout[:1000])}</pre>"
|
||
else:
|
||
text = "❌ Failed to get SSL status"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_website")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# PROMO & CREDITS
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_promo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Promo information."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
text = (
|
||
f"<b>🎁 GoTelegram Promo</b>\n\n"
|
||
f"Share the love! Invite friends to use GoTelegram.\n\n"
|
||
f"<a href='{PROMO_LINK}'>Promo Link</a>\n\n"
|
||
f"Support development:\n"
|
||
f"<a href='{TIP_LINK}'>Send a Tip</a>"
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_menu_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Credits and acknowledgements."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
text = (
|
||
f"<b>ℹ️ Credits & Acknowledgements</b>\n\n"
|
||
f"<b>GoTelegram v{GOTELEGRAM_VERSION}</b>\n\n"
|
||
f"Built with love for the Telegram community\n\n"
|
||
f"<b>Special thanks to:</b>\n\n"
|
||
f"🙏 <b>telemt</b> - MTProxy engine\n"
|
||
f" High-performance proxy core\n\n"
|
||
f"🎨 <b>HTML5UP</b> - Beautiful web templates\n"
|
||
f" Responsive design & themes\n\n"
|
||
f"📚 <b>Learning Zone</b> - Educational resources\n"
|
||
f" Community learning support\n\n"
|
||
f"🚀 <b>Start Bootstrap</b> - Bootstrap templates\n"
|
||
f" Professional design framework\n\n"
|
||
f"💬 <b>Community</b> - Your feedback & support\n\n"
|
||
f"<i>GoTelegram is open-source and community-driven</i>"
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# REMOVE
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_remove(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Remove installation."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
text = (
|
||
"<b>⚠️ Remove GoTelegram</b>\n\n"
|
||
"This will completely remove the installation.\n"
|
||
"Are you sure?"
|
||
)
|
||
|
||
buttons = [
|
||
[InlineKeyboardButton("❌ Yes, Remove", callback_data="remove_confirm")],
|
||
[InlineKeyboardButton("« Back", callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_remove_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Confirm removal."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Removing GoTelegram...")
|
||
|
||
# Stop service
|
||
await sh("systemctl", "stop", TELEMT_SERVICE)
|
||
|
||
# Remove directories
|
||
for path in ["/opt/gotelegram", WEBSITE_ROOT]:
|
||
await sh("rm", "-rf", path)
|
||
|
||
text = "✅ GoTelegram removed successfully"
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# CALLBACK ROUTING
|
||
# ============================================================================
|
||
|
||
|
||
async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Route all callbacks."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
|
||
# Access control
|
||
if not is_user_allowed(update.effective_user.id):
|
||
await query.answer("Access denied")
|
||
return
|
||
|
||
# Main menu
|
||
if data == "menu_main":
|
||
await query.answer()
|
||
buttons = get_main_menu()
|
||
text = (
|
||
f"<b>GoTelegram v{GOTELEGRAM_VERSION}</b>\n\n"
|
||
"🤖 MTProxy Management\n"
|
||
"Select an action:"
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=buttons, parse_mode="HTML")
|
||
return
|
||
|
||
if data == "close_menu":
|
||
await query.answer()
|
||
await query.delete_message()
|
||
return
|
||
|
||
# Dispatch to handlers
|
||
handlers = {
|
||
"menu_install": cb_menu_install,
|
||
"menu_status": cb_menu_status,
|
||
"menu_link": cb_menu_link,
|
||
"menu_share": cb_menu_share,
|
||
"menu_restart": cb_menu_restart,
|
||
"menu_logs": cb_menu_logs,
|
||
"menu_backup": cb_menu_backup,
|
||
"menu_restore": cb_menu_restore,
|
||
"menu_update": cb_menu_update,
|
||
"menu_change": cb_menu_change,
|
||
"menu_website": cb_menu_website,
|
||
"menu_promo": cb_menu_promo,
|
||
"menu_credits": cb_menu_credits,
|
||
"menu_remove": cb_menu_remove,
|
||
"install_mode_quick": cb_install_mode_quick,
|
||
"install_mode_stealth": cb_install_mode_stealth,
|
||
"backup_create": cb_backup_create,
|
||
"backup_list": cb_backup_list,
|
||
"ssl_renew": cb_ssl_renew,
|
||
"ssl_status": cb_ssl_status,
|
||
"remove_confirm": cb_remove_confirm,
|
||
"change_quick": cb_change_quick,
|
||
"change_stealth": cb_change_stealth,
|
||
"install_migrate": cb_install_migrate,
|
||
}
|
||
|
||
# Pattern-based handlers
|
||
if data.startswith("quick_dom_"):
|
||
await cb_quick_domain(update, context)
|
||
elif data.startswith("stealth_cat_"):
|
||
await cb_stealth_category(update, context)
|
||
elif data.startswith("stealth_tpl_"):
|
||
await cb_stealth_template(update, context)
|
||
elif data.startswith("stealth_confirm_"):
|
||
await cb_stealth_confirm(update, context)
|
||
elif data.startswith("restore_idx_"):
|
||
await cb_restore_backup(update, context)
|
||
elif data in handlers:
|
||
await handlers[data](update, context)
|
||
else:
|
||
await query.answer("Unknown action")
|
||
|
||
|
||
# ============================================================================
|
||
# ERROR HANDLERS
|
||
# ============================================================================
|
||
|
||
|
||
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Log errors caused by Updates."""
|
||
logger.error(f"Exception while handling an update:", exc_info=context.error)
|
||
|
||
|
||
# ============================================================================
|
||
# MAIN APPLICATION
|
||
# ============================================================================
|
||
|
||
|
||
def main() -> None:
|
||
"""Start the bot."""
|
||
if not BOT_TOKEN:
|
||
logger.error("BOT_TOKEN not set in .env file")
|
||
return
|
||
|
||
# Create the Application
|
||
application = Application.builder().token(BOT_TOKEN).build()
|
||
|
||
# Command handlers
|
||
application.add_handler(CommandHandler("start", cmd_start))
|
||
application.add_handler(CommandHandler("help", cmd_help))
|
||
application.add_handler(CommandHandler("status", cmd_status))
|
||
application.add_handler(CommandHandler("logs", cmd_logs))
|
||
|
||
# Callback query handler (buttons)
|
||
application.add_handler(CallbackQueryHandler(handle_callback))
|
||
|
||
# Error handler
|
||
application.add_error_handler(error_handler)
|
||
|
||
# Run the bot
|
||
logger.info(f"GoTelegram v{GOTELEGRAM_VERSION} bot starting...")
|
||
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|