Files
gotelegram_pro/gotelegram-bot/bot.py
anten-ka 9c084f37ec bugfix: {{NC}} typo, bot TOML v3 parsing, add_secret v3 format
- install.sh: fix {{NC}} -> ${NC} color escape on line 265
- bot.py: fix TOML parsing for telemt v3 [access.users] format
- bot.py: fix telemt config section [server].port instead of [config].listen_port
- telemt_config.sh: fix add_secret_to_config() for v3 format
2026-04-10 00:16:30 +03:00

1838 lines
61 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
GoTelegram v2.2 Bot - MTProxy Management for Linux
Manages telemt engine via Telegram interface with full CLI feature parity
Uses python-telegram-bot v21+
"""
import asyncio
import csv
import html
import json
import logging
import os
import re
import subprocess
import time
import toml
from datetime import datetime
from io import StringIO
from pathlib import Path
from typing import Tuple, Optional, List, Dict, Any
from dotenv import load_dotenv
from telegram import (
Update,
InlineKeyboardButton,
InlineKeyboardMarkup,
InputFile,
)
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
ContextTypes,
filters,
)
from telegram.error import TelegramError, BadRequest
# Load environment variables
load_dotenv()
# Logging configuration
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# ============================================================================
# CONFIGURATION
# ============================================================================
GOTELEGRAM_VERSION = "2.3.1"
GOTELEGRAM_CONFIG = "/opt/gotelegram/config.json"
TELEMT_CONFIG = "/etc/telemt/config.toml"
TELEMT_SERVICE = "telemt"
WEBSITE_ROOT = "/var/www/gotelegram-site"
BACKUP_DIR = "/opt/gotelegram/backups"
TEMPLATES_CATALOG = "/opt/gotelegram/templates_catalog.json"
PROMO_LINK_1 = "https://vk.cc/ct29NQ"
PROMO_LINK_2 = "https://vk.cc/cUxAhj"
TIP_LINK = "https://pay.cloudtips.ru/p/7410814f"
PROMO_STAMP_FILE = "/opt/gotelegram/.promo_bot_last_shown"
BOT_TOKEN = os.getenv("BOT_TOKEN")
ENV_FILE = "/opt/gotelegram-bot/.env"
# ── Загрузка ALLOWED_IDS ────────────────────────────────────────────────────
# Поддерживает запятую, пробел, или их комбинацию как разделитель
ALLOWED_IDS: set = set()
_WAITING_FOR_ADMIN = False # True если список пуст → ждём первого админа
def _load_allowed_ids() -> None:
"""Загрузить ALLOWED_IDS из переменной окружения."""
global ALLOWED_IDS, _WAITING_FOR_ADMIN
raw = os.getenv("ALLOWED_IDS", "")
ALLOWED_IDS = set()
# Разделители: запятая, пробел, или оба
for part in re.split(r'[,\s]+', raw):
part = part.strip()
if part:
try:
ALLOWED_IDS.add(int(part))
except ValueError:
logging.warning(f"Invalid ALLOWED_IDS entry: {part}")
_WAITING_FOR_ADMIN = len(ALLOWED_IDS) == 0
def _save_allowed_ids() -> None:
"""Сохранить ALLOWED_IDS в .env файл и обновить os.environ."""
global _WAITING_FOR_ADMIN
ids_str = ",".join(str(i) for i in sorted(ALLOWED_IDS))
os.environ["ALLOWED_IDS"] = ids_str
_WAITING_FOR_ADMIN = len(ALLOWED_IDS) == 0
if not os.path.exists(ENV_FILE):
return
try:
with open(ENV_FILE, "r") as f:
lines = f.readlines()
found = False
new_lines = []
for line in lines:
if line.strip().startswith("ALLOWED_IDS="):
if ids_str:
new_lines.append(f"ALLOWED_IDS={ids_str}\n")
# Если пусто — удаляем строку
found = True
else:
new_lines.append(line)
if not found and ids_str:
new_lines.append(f"ALLOWED_IDS={ids_str}\n")
with open(ENV_FILE, "w") as f:
f.writelines(new_lines)
logger.info(f"ALLOWED_IDS updated in .env: {ids_str or '(empty)'}")
except OSError as e:
logger.error(f"Failed to update .env: {e}")
_load_allowed_ids()
LITE_DOMAINS = [
"google.com",
"microsoft.com",
"cloudflare.com",
"apple.com",
"amazon.com",
"github.com",
"stackoverflow.com",
"medium.com",
"wikipedia.org",
"coursera.org",
"udemy.com",
"habr.com",
"stepik.org",
"duolingo.com",
"khanacademy.org",
"bbc.com",
"reuters.com",
"nytimes.com",
"ted.com",
"zoom.us",
]
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
async def sh(*args, timeout: int = 60) -> Tuple[int, str, str]:
"""Execute shell command asynchronously.
Args:
*args: Command and arguments
timeout: Timeout in seconds
Returns:
Tuple of (return_code, stdout, stderr)
"""
try:
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=timeout
)
return (
process.returncode,
stdout.decode("utf-8", errors="replace"),
stderr.decode("utf-8", errors="replace"),
)
except asyncio.TimeoutError:
try:
process.kill()
await process.wait()
except Exception:
pass
return (-1, "", f"Command timeout after {timeout}s")
except Exception as e:
return (-1, "", str(e))
def load_json(path: str) -> Optional[Dict]:
"""Load JSON file."""
try:
with open(path, "r") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load {path}: {e}")
return None
def load_toml(path: str) -> Optional[Dict]:
"""Load TOML file."""
try:
with open(path, "r") as f:
return toml.load(f)
except Exception as e:
logger.warning(f"Failed to load {path}: {e}")
return None
def save_json(path: str, data: Dict) -> bool:
"""Save JSON file."""
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2)
return True
except Exception as e:
logger.error(f"Failed to save {path}: {e}")
return False
async def safe_edit_message(query, text: str, reply_markup=None, parse_mode=None) -> bool:
"""Safely edit message, handling cases where message was deleted or not modified."""
try:
await query.edit_message_text(
text, reply_markup=reply_markup, parse_mode=parse_mode
)
return True
except BadRequest as e:
err_msg = str(e).lower()
if "message is not modified" in err_msg:
return True # No change needed, not an error
if "message to edit not found" in err_msg or "message can't be edited" in err_msg:
logger.warning(f"Cannot edit message: {e}")
return False
raise # Re-raise unexpected BadRequest
async def check_service_status(service: str) -> bool:
"""Check if systemd service is running."""
code, _, _ = await sh("systemctl", "is-active", service)
return code == 0
async def get_telemt_version() -> str:
"""Get telemt version."""
code, stdout, _ = await sh("telemt", "-v")
if code == 0:
return stdout.strip().split()[-1] if stdout else "unknown"
return "unknown"
def is_docker_running() -> bool:
"""Check if Docker daemon is running."""
try:
subprocess.run(
["docker", "ps"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
)
return True
except Exception:
return False
async def check_old_container() -> Optional[str]:
"""Check for old mtg Docker container (v1 migration)."""
if not is_docker_running():
return None
code, stdout, _ = await sh("docker", "ps", "-a", "--format", "{{.Names}}")
if code == 0 and "mtg" in stdout:
return "mtg"
return None
# ============================================================================
# ACCESS CONTROL
# ============================================================================
def is_user_allowed(user_id: int) -> bool:
"""Check if user ID is in ALLOWED_IDS. If list is empty — waiting for admin."""
if _WAITING_FOR_ADMIN:
return False # Никому не даём доступ пока не назначен админ
return user_id in ALLOWED_IDS
def add_admin(user_id: int) -> None:
"""Добавить администратора и сохранить в .env."""
ALLOWED_IDS.add(user_id)
_save_allowed_ids()
logger.info(f"Admin added: {user_id}")
def remove_admin(user_id: int) -> None:
"""Убрать администратора и сохранить в .env."""
ALLOWED_IDS.discard(user_id)
_save_allowed_ids()
logger.info(f"Admin removed: {user_id}")
async def require_auth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
"""Check authorization and send error if not allowed."""
user_id = update.effective_user.id
# Режим ожидания первого админа — обрабатывается в cmd_start
if _WAITING_FOR_ADMIN:
return False
if not is_user_allowed(user_id):
if update.message:
await update.message.reply_text(
f"⛔ Доступ запрещён.\nВаш ID: <code>{user_id}</code>",
parse_mode="HTML",
)
logger.warning(f"Unauthorized access attempt from user {user_id}")
return False
return True
# ============================================================================
# MAIN MENU
# ============================================================================
def get_main_menu() -> InlineKeyboardMarkup:
"""Generate main menu keyboard."""
buttons = [
[
InlineKeyboardButton("⚙️ Install", callback_data="menu_install"),
InlineKeyboardButton("📊 Status", callback_data="menu_status"),
],
[
InlineKeyboardButton("🔗 Link", callback_data="menu_link"),
InlineKeyboardButton("📤 Share", callback_data="menu_share"),
],
[
InlineKeyboardButton("🔄 Restart", callback_data="menu_restart"),
InlineKeyboardButton("📋 Logs", callback_data="menu_logs"),
],
[
InlineKeyboardButton("⚡ Change Mode/Template", callback_data="menu_change"),
InlineKeyboardButton("💾 Backup", callback_data="menu_backup"),
],
[
InlineKeyboardButton("↩️ Restore", callback_data="menu_restore"),
InlineKeyboardButton("📡 Update telemt", callback_data="menu_update"),
],
[
InlineKeyboardButton("🌐 Website/SSL", callback_data="menu_website"),
InlineKeyboardButton("🎁 Промо", callback_data="menu_promo"),
],
[
InlineKeyboardButton("📊 Traffic Stats", callback_data="menu_stats"),
InlineKeyboardButton("🗑️ Remove", callback_data="menu_remove"),
],
[
InlineKeyboardButton("👤 Админы", callback_data="menu_admins"),
InlineKeyboardButton(" Credits", callback_data="menu_credits"),
],
[
InlineKeyboardButton("❌ Close", callback_data="close_menu"),
],
]
return InlineKeyboardMarkup(buttons)
# ============================================================================
# COMMANDS
# ============================================================================
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Start command - show main menu, promo once per day.
Если ALLOWED_IDS пуст — режим авто-регистрации первого админа.
"""
user = update.effective_user
user_id = user.id
# ── Режим ожидания первого админа ──
if _WAITING_FOR_ADMIN:
name = user.full_name or user.username or str(user_id)
text = (
f"<b>👋 Привет, {html.escape(name)}!</b>\n\n"
f"Бот ещё не настроен.\n"
f"Ваш Telegram ID: <code>{user_id}</code>\n\n"
f"Назначить вас администратором?"
)
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("✅ Да", callback_data=f"admin_confirm_{user_id}"),
InlineKeyboardButton("❌ Нет", callback_data="admin_cancel"),
]
])
await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML")
return
# ── Проверка доступа ──
if not is_user_allowed(user_id):
await update.message.reply_text(
f"⛔ Доступ запрещён.\nВаш ID: <code>{user_id}</code>",
parse_mode="HTML",
)
return
welcome = (
f"<b>GoTelegram v{GOTELEGRAM_VERSION}</b>\n\n"
"🤖 MTProxy Management Bot\n"
"Powered by telemt engine\n\n"
"Select an action from the menu below:"
)
await update.message.reply_text(
welcome, reply_markup=get_main_menu(), parse_mode="HTML"
)
# Промо раз в сутки
if should_show_promo_bot():
mark_promo_shown_bot()
await update.message.reply_text(
get_promo_text(), parse_mode="HTML"
)
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Help command - show available commands."""
if not await require_auth(update, context):
return
help_text = (
"<b>GoTelegram Bot — Команды</b>\n\n"
"/start — Главное меню\n"
"/help — Эта справка\n"
"/status — Быстрый статус\n"
"/logs — Последние логи\n"
"/addadmin ID — Добавить админа\n"
"/deladmin ID — Удалить админа\n\n"
"Используйте кнопки меню для остальных операций."
)
await update.message.reply_text(help_text, parse_mode="HTML")
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Quick status check."""
if not await require_auth(update, context):
return
await update.message.reply_text("⏳ Checking status...", parse_mode="HTML")
status_text = await get_status_text()
await update.message.reply_text(status_text, parse_mode="HTML")
async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show recent logs."""
if not await require_auth(update, context):
return
code, stdout, stderr = await sh(
"journalctl", "-u", TELEMT_SERVICE, "-n", "20", "--no-pager"
)
if code == 0:
log_text = stdout[-1500:] if len(stdout) > 1500 else stdout
await update.message.reply_text(
f"<pre>{html.escape(log_text)}</pre>",
parse_mode="HTML",
)
else:
await update.message.reply_text("Failed to retrieve logs")
# ============================================================================
# STATUS
# ============================================================================
async def get_status_text() -> str:
"""Generate status report."""
lines = ["<b>📊 Current Status</b>\n"]
# Service status
is_running = await check_service_status(TELEMT_SERVICE)
lines.append(f"<b>Service:</b> {'✅ Running' if is_running else '❌ Stopped'}")
# Telemt version
version = await get_telemt_version()
lines.append(f"<b>Telemt:</b> v{version}")
# Config status
config = load_json(GOTELEGRAM_CONFIG)
if config:
lines.append(f"<b>Mode:</b> {html.escape(str(config.get('mode', 'unknown')))}")
if "template" in config:
lines.append(f"<b>Template:</b> {html.escape(str(config['template']))}")
if "domain" in config:
lines.append(f"<b>Domain:</b> {html.escape(str(config['domain']))}")
if "port" in config:
lines.append(f"<b>Port:</b> {html.escape(str(config['port']))}")
# Telemt config (v3: [server] port = ..., [censorship] tls_domain = ...)
telemt_cfg = load_toml(TELEMT_CONFIG)
if telemt_cfg:
server_cfg = telemt_cfg.get("server", {})
if "port" in server_cfg:
lines.append(f"<b>Listen Port:</b> {server_cfg['port']}")
censor_cfg = telemt_cfg.get("censorship", {})
if "tls_domain" in censor_cfg:
lines.append(f"<b>TLS Domain:</b> {html.escape(str(censor_cfg['tls_domain']))}")
# Backups
backup_count = 0
try:
if os.path.exists(BACKUP_DIR):
backup_count = len([f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")])
except Exception:
pass
lines.append(f"<b>Backups:</b> {backup_count}")
# Old container check
old_container = await check_old_container()
if old_container:
lines.append(f"\n⚠️ <b>Found old container:</b> {html.escape(old_container)}")
lines.append("Run 'Install' to migrate")
return "\n".join(lines)
async def get_traffic_stats() -> str:
"""Get formatted traffic statistics."""
# Read current snapshot
current_file = "/run/gotelegram/stats_current.json"
history_file = "/opt/gotelegram/stats_history.csv"
try:
with open(current_file, "r") as f:
current = json.load(f)
except Exception:
return "📊 <b>Статистика</b>\n\n<i>Данные недоступны. Убедитесь что модуль статистики включён.</i>"
# Read history
history = []
try:
with open(history_file, "r") as f:
reader = csv.reader(f)
for row in reader:
if len(row) >= 3:
history.append({
"ts": int(row[0]),
"proxy": int(row[1]),
"site": int(row[2]),
})
except Exception:
pass
now = int(time.time())
def format_bytes(b):
if b < 1024:
return f"{b} B"
if b < 1048576:
return f"{b/1024:.1f} KB"
if b < 1073741824:
return f"{b/1048576:.1f} MB"
return f"{b/1073741824:.1f} GB"
def format_rate(bps):
if bps < 1024:
return f"{bps:.0f} B/s"
if bps < 1048576:
return f"{bps/1024:.1f} KB/s"
return f"{bps/1048576:.1f} MB/s"
def calc_for_period(secs, key):
target_ts = now - secs
# Find closest snapshot to target_ts
closest = None
for h in history:
if h["ts"] <= target_ts:
if closest is None or h["ts"] > closest["ts"]:
closest = h
if closest is None:
return "", ""
current_val = current.get(f"{key}_bytes", 0)
diff = current_val - closest[key]
if diff < 0:
diff = 0
elapsed = now - closest["ts"]
if elapsed <= 0:
elapsed = 1
rate = diff / elapsed
return format_bytes(diff), format_rate(rate)
periods = [
("1 мин", 60),
("5 мин", 300),
("60 мин", 3600),
("1 день", 86400),
("7 дней", 604800),
("30 дней", 2592000),
("365 дней", 31536000),
]
lines = ["📊 <b>Статистика трафика</b>\n"]
for label, key in [("Proxy (telemt)", "proxy"), ("Сайт (nginx)", "site")]:
lines.append(f"\n<b>{label}:</b>")
lines.append("<pre>")
lines.append(f"{'Период':<10}{'Трафик':>10}{'Скорость':>10}")
lines.append("" * 36)
for name, secs in periods:
total, rate = calc_for_period(secs, key)
lines.append(f"{name:<10}{total:>10}{rate:>10}")
lines.append("</pre>")
return "\n".join(lines)
async def cb_menu_stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show traffic statistics."""
query = update.callback_query
await query.answer()
stats_text = await get_traffic_stats()
keyboard = [
[InlineKeyboardButton("🔄 Обновить", callback_data="menu_stats")],
[InlineKeyboardButton("« Меню", callback_data="menu_main")],
]
await safe_edit_message(
query,
stats_text,
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
async def cb_menu_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Status callback — show detailed proxy/server status."""
query = update.callback_query
await query.answer()
if not await require_auth(update, context):
return
text = await get_status_text()
keyboard = [
[InlineKeyboardButton("🔄 Обновить", callback_data="menu_status")],
[InlineKeyboardButton("« Меню", callback_data="menu_main")],
]
await safe_edit_message(
query,
text,
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
# ============================================================================
# INSTALL
# ============================================================================
def get_install_mode_menu() -> InlineKeyboardMarkup:
"""Install mode selection menu."""
buttons = [
[InlineKeyboardButton("⚡ Lite", callback_data="install_mode_lite")],
[InlineKeyboardButton("🛡 Pro", callback_data="install_mode_pro")],
[InlineKeyboardButton("« Back", callback_data="menu_main")],
]
return InlineKeyboardMarkup(buttons)
async def cb_menu_install(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Install menu callback."""
query = update.callback_query
await query.answer()
# Check for old container
old_container = await check_old_container()
if old_container:
text = (
f"⚠️ <b>Migration from v1 detected</b>\n\n"
f"Found Docker container: {html.escape(old_container)}\n\n"
f"Would you like to:\n"
f"1. Migrate from v1 (recommended)\n"
f"2. Fresh install (will remove old container)\n\n"
f"<i>Select below or choose install mode</i>"
)
buttons = [
[InlineKeyboardButton("🔄 Migrate from v1", callback_data="install_migrate")],
[InlineKeyboardButton("✨ Fresh Install", callback_data="install_mode_lite")],
[InlineKeyboardButton("« Back", callback_data="menu_main")],
]
keyboard = InlineKeyboardMarkup(buttons)
else:
text = "Select installation mode:"
keyboard = get_install_mode_menu()
await safe_edit_message(query,
text, reply_markup=keyboard, parse_mode="HTML"
)
async def cb_install_mode_lite(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Lite mode domain selection."""
query = update.callback_query
await query.answer()
# Show domains with pagination (4 per row, 2 rows)
buttons = []
for i in range(0, len(LITE_DOMAINS), 2):
row = []
for j in range(2):
if i + j < len(LITE_DOMAINS):
domain = LITE_DOMAINS[i + j]
row.append(
InlineKeyboardButton(
domain, callback_data=f"lite_dom_{i+j}"
)
)
buttons.append(row)
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_install")])
text = "Select a domain for Lite mode:"
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,text, reply_markup=keyboard)
async def cb_lite_domain(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Lite domain selection callback."""
query = update.callback_query
data = query.data
try:
domain_idx = int(data.split("_")[-1])
domain = LITE_DOMAINS[domain_idx]
except (ValueError, IndexError):
await query.answer("Invalid domain selection")
return
await query.answer()
await safe_edit_message(query,f"⏳ Installing with domain: {domain}...")
# Simulate installation (in real scenario, call install script)
config = {
"mode": "lite",
"domain": domain,
"port": 443,
"installed_at": datetime.now().isoformat(),
}
if save_json(GOTELEGRAM_CONFIG, config):
text = (
f"✅ <b>Lite mode installed!</b>\n\n"
f"<b>Domain:</b> {domain}\n"
f"<b>Mode:</b> Lite\n\n"
f"Service starting... Check status in 10 seconds."
)
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,
text, reply_markup=keyboard, parse_mode="HTML"
)
else:
await safe_edit_message(query,
"❌ Failed to save configuration",
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_install")]]
),
)
async def cb_install_mode_pro(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Pro mode - show template categories."""
query = update.callback_query
await query.answer()
catalog = load_json(TEMPLATES_CATALOG)
if not catalog or "categories" not in catalog:
await safe_edit_message(query,
"❌ Template catalog not found",
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_install")]]
),
)
return
buttons = []
for cat in catalog.get("categories", []):
buttons.append(
[
InlineKeyboardButton(
f"📁 {cat['name']}", callback_data=f"pro_cat_{cat['id']}"
)
]
)
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_install")])
text = "Pro Mode - Select Template Category:"
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,text, reply_markup=keyboard)
async def cb_pro_category(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show templates in category."""
query = update.callback_query
data = query.data
cat_id = data.removeprefix("pro_cat_")
await query.answer()
catalog = load_json(TEMPLATES_CATALOG)
if not catalog:
await safe_edit_message(query,"❌ Template catalog not found")
return
# Find category and templates
category = None
templates = []
for cat in catalog.get("categories", []):
if cat["id"] == cat_id:
category = cat
templates = cat.get("templates", [])
break
if not category:
await safe_edit_message(query,"❌ Category not found")
return
buttons = []
for tpl in templates:
buttons.append(
[
InlineKeyboardButton(
f"🎨 {tpl['name']}", callback_data=f"pro_tpl_{tpl['id']}"
)
]
)
buttons.append([InlineKeyboardButton("« Back", callback_data="install_mode_pro")])
text = f"Select template from <b>{html.escape(category['name'])}</b>:"
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_pro_template(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show template preview and confirm."""
query = update.callback_query
data = query.data
tpl_id = data.removeprefix("pro_tpl_")
await query.answer()
catalog = load_json(TEMPLATES_CATALOG)
if not catalog:
await safe_edit_message(query,"❌ Template catalog not found")
return
# Find template
template = None
for cat in catalog.get("categories", []):
for tpl in cat.get("templates", []):
if tpl["id"] == tpl_id:
template = tpl
break
if template:
break
if not template:
await safe_edit_message(query,"❌ Template not found")
return
tpl_name = html.escape(template.get('name', 'Unknown'))
tpl_desc = html.escape(template.get('description', 'N/A'))
text = (
f"<b>🎨 Template Preview</b>\n\n"
f"<b>Name:</b> {tpl_name}\n"
f"<b>Description:</b> {tpl_desc}\n\n"
)
if "preview_url" in template:
preview_url = html.escape(template['preview_url'], quote=True)
text += f'<a href="{preview_url}">View Live Preview</a>\n\n'
text += "Confirm installation?"
buttons = [
[
InlineKeyboardButton(
"✅ Install", callback_data=f"pro_confirm_{tpl_id}"
)
],
[InlineKeyboardButton("« Back", callback_data="install_mode_pro")],
]
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_pro_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Confirm and install pro template."""
query = update.callback_query
data = query.data
tpl_id = data.removeprefix("pro_confirm_")
await query.answer()
await safe_edit_message(query,"⏳ Installing template...")
config = {
"mode": "pro",
"template": tpl_id,
"port": 443,
"installed_at": datetime.now().isoformat(),
}
if save_json(GOTELEGRAM_CONFIG, config):
text = (
f"✅ <b>Pro mode installed!</b>\n\n"
f"<b>Template:</b> {html.escape(tpl_id)}\n"
f"<b>Mode:</b> Pro\n\n"
f"Service starting... Check status in 10 seconds."
)
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,
text, reply_markup=keyboard, parse_mode="HTML"
)
else:
await safe_edit_message(query,
"❌ Failed to save configuration",
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_install")]]
),
)
# ============================================================================
# PROXY LINK & SHARE
# ============================================================================
async def get_proxy_link() -> Optional[str]:
"""Generate proxy link from config. Pro-mode uses domain + fake-TLS secret."""
config = load_json(GOTELEGRAM_CONFIG)
if not config:
return None
# Get secret from telemt TOML config (v3 format: [access.users] main = "...")
secret = config.get("secret", "")
if not secret:
telemt_cfg = load_toml(TELEMT_CONFIG)
if telemt_cfg:
access = telemt_cfg.get("access", {})
users = access.get("users", {})
if isinstance(users, dict):
secret = users.get("main", "")
if not secret:
return None
mode = config.get("mode", "lite")
domain = config.get("domain", "")
port = config.get("port", 443)
# Pro-режим: ссылка с доменом и fake-TLS секретом (ee + secret + hex domain)
if mode == "pro" and domain:
domain_hex = domain.encode().hex()
faketls_secret = f"ee{secret}{domain_hex}"
return f"tg://proxy?server={domain}&port={port}&secret={faketls_secret}"
# Lite-режим: IP + fake-TLS с mask_host
code, stdout, _ = await sh("curl", "-s", "-4", "--max-time", "5", "https://api.ipify.org")
server = stdout.strip() if code == 0 and stdout.strip() else "0.0.0.0"
mask_host = config.get("mask_host", "")
if mask_host:
domain_hex = mask_host.encode().hex()
faketls_secret = f"ee{secret}{domain_hex}"
return f"tg://proxy?server={server}&port={port}&secret={faketls_secret}"
return f"tg://proxy?server={server}&port={port}&secret={secret}"
async def cb_menu_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Generate and show proxy link."""
query = update.callback_query
await query.answer()
link = await get_proxy_link()
if not link:
text = "❌ Proxy not installed yet. Run install first."
else:
text = (
f"<b>🔗 Proxy Link</b>\n\n"
f"<code>{html.escape(link)}</code>\n\n"
f"Open in Telegram to connect."
)
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_menu_share(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Share link as QR code."""
query = update.callback_query
await query.answer()
link = await get_proxy_link()
if not link:
text = "❌ Proxy not installed yet."
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,text, reply_markup=keyboard)
return
# Try to generate QR code
qr_file = None
code, _, _ = await sh("which", "qrencode")
if code == 0:
qr_path = "/tmp/proxy_qr.png"
code, _, _ = await sh("qrencode", "-o", qr_path, link)
if code == 0 and os.path.exists(qr_path):
qr_file = qr_path
if qr_file:
try:
with open(qr_file, "rb") as f:
await query.message.reply_photo(
photo=f,
caption=f"<b>📤 Proxy QR Code</b>\n\n{html.escape(link)}",
parse_mode="HTML",
)
except Exception as e:
logger.error(f"Failed to send QR code: {e}")
await safe_edit_message(query,
f"<b>🔗 Proxy Link</b>\n\n<code>{html.escape(link)}</code>",
parse_mode="HTML",
)
finally:
try:
os.remove(qr_file)
except OSError:
pass
else:
await safe_edit_message(query,
f"<b>🔗 Proxy Link</b>\n\n<code>{html.escape(link)}</code>",
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
),
parse_mode="HTML",
)
# ============================================================================
# RESTART & LOGS
# ============================================================================
async def cb_menu_restart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Restart service."""
query = update.callback_query
await query.answer()
text = "⏳ Restarting telemt service..."
await safe_edit_message(query,text)
code, _, stderr = await sh("systemctl", "restart", TELEMT_SERVICE)
if code == 0:
text = "✅ Service restarted successfully"
else:
text = f"❌ Failed to restart:\n<code>{html.escape(stderr[:500])}</code>"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,
text, reply_markup=keyboard, parse_mode="HTML"
)
async def cb_menu_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show recent logs."""
query = update.callback_query
await query.answer()
code, stdout, _ = await sh(
"journalctl", "-u", TELEMT_SERVICE, "-n", "30", "--no-pager"
)
if code == 0:
log_text = stdout[-1000:] if len(stdout) > 1000 else stdout
text = f"<b>📋 Recent Logs</b>\n\n<pre>{html.escape(log_text)}</pre>"
else:
text = "❌ Failed to retrieve logs"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
# ============================================================================
# BACKUP & RESTORE
# ============================================================================
async def cb_menu_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Backup menu."""
query = update.callback_query
await query.answer()
# List existing backups
backups = []
try:
if os.path.exists(BACKUP_DIR):
backups = sorted(
[f for f in os.listdir(BACKUP_DIR)
if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
reverse=True,
)
except Exception:
pass
buttons = [[InlineKeyboardButton("💾 Create Backup", callback_data="backup_create")]]
if backups:
buttons.append(
[InlineKeyboardButton("📋 List Backups", callback_data="backup_list")]
)
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_main")])
text = f"<b>💾 Backup Management</b>\n\nExisting backups: {len(backups)}"
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_backup_create(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Create backup."""
query = update.callback_query
await query.answer()
await safe_edit_message(query,"⏳ Creating backup...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(BACKUP_DIR, f"backup_{timestamp}.tar.gz")
os.makedirs(BACKUP_DIR, exist_ok=True)
code, _, stderr = await sh(
"tar", "-czf", backup_file, GOTELEGRAM_CONFIG, TELEMT_CONFIG
)
if code == 0:
text = f"✅ Backup created:\n<code>{html.escape(backup_file)}</code>"
else:
text = f"❌ Backup failed:\n<code>{html.escape(stderr[:500])}</code>"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_backup")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_backup_list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""List backups."""
query = update.callback_query
await query.answer()
backups = []
try:
if os.path.exists(BACKUP_DIR):
backups = sorted(
[f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
reverse=True,
)
except Exception:
pass
if not backups:
text = "No backups found"
else:
text = "<b>📋 Available Backups</b>\n\n"
for backup in backups[:10]:
path = os.path.join(BACKUP_DIR, backup)
size = os.path.getsize(path) / (1024 * 1024)
text += f"<code>{html.escape(backup)}</code> ({size:.2f} MB)\n"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_backup")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_menu_restore(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Restore menu."""
query = update.callback_query
await query.answer()
backups = []
try:
if os.path.exists(BACKUP_DIR):
backups = sorted(
[f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
reverse=True,
)
except Exception:
pass
if not backups:
text = "❌ No backups available"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
else:
text = "Select backup to restore:"
buttons = []
for i, backup in enumerate(backups[:10]):
buttons.append(
[
InlineKeyboardButton(
backup, callback_data=f"restore_idx_{i}"
)
]
)
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_main")])
keyboard = InlineKeyboardMarkup(buttons)
# Store backup list in user_data for retrieval
context.user_data["backup_list"] = backups[:10]
await safe_edit_message(query,text, reply_markup=keyboard)
async def cb_restore_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Execute backup restoration."""
query = update.callback_query
data = query.data
try:
idx = int(data.removeprefix("restore_idx_"))
except ValueError:
await query.answer("Invalid backup selection")
return
backup_list = context.user_data.get("backup_list", [])
if idx < 0 or idx >= len(backup_list):
await query.answer("Backup not found")
return
backup_name = backup_list[idx]
backup_path = os.path.join(BACKUP_DIR, backup_name)
await query.answer()
await safe_edit_message(query,f"⏳ Restoring from {html.escape(backup_name)}...")
if not os.path.exists(backup_path):
text = "❌ Backup file not found"
else:
# Simple restore: extract tar to overwrite configs
code, _, stderr = await sh(
"tar", "-xzf", backup_path, "-C", "/", timeout=60
)
if code == 0:
# Restart services
await sh("systemctl", "restart", TELEMT_SERVICE)
text = f"✅ Restored from {html.escape(backup_name)}"
else:
text = f"❌ Restore failed:\n<code>{html.escape(stderr[:500])}</code>"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
# ============================================================================
# UPDATE & MODE/TEMPLATE CHANGE
# ============================================================================
async def cb_menu_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Update telemt by re-running the install script's update logic."""
query = update.callback_query
await query.answer()
await safe_edit_message(query,"⏳ Checking for telemt updates...")
# Get current version
cur_code, cur_out, _ = await sh("telemt", "--version")
current = cur_out.strip() if cur_code == 0 else "unknown"
# Check latest release from GitHub
code, stdout, stderr = await sh(
"curl", "-s", "--max-time", "10",
"https://api.github.com/repos/telemt/telemt/releases/latest",
)
if code != 0 or not stdout.strip():
text = "❌ Failed to check for updates"
else:
try:
release = json.loads(stdout)
latest = release.get("tag_name", "unknown")
if latest == current:
text = f"✅ telemt is already up to date ({html.escape(current)})"
else:
text = (
f" Update available: {html.escape(current)}{html.escape(latest)}\n\n"
f"Run the CLI installer to update:\n"
f"<code>sudo bash install.sh</code> → menu item 10"
)
except json.JSONDecodeError:
text = "❌ Failed to parse release info"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_menu_change(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Change mode or template."""
query = update.callback_query
await query.answer()
buttons = [
[InlineKeyboardButton("⚡ Switch to Lite Mode", callback_data="change_lite")],
[InlineKeyboardButton("🛡 Switch to Pro Mode", callback_data="change_pro")],
[InlineKeyboardButton("« Back", callback_data="menu_main")],
]
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,
"Change mode or template:", reply_markup=keyboard
)
async def cb_change_lite(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Switch to lite mode — show domain selection."""
query = update.callback_query
await query.answer()
# Reuse the lite mode domain selection flow
await cb_install_mode_lite(update, context)
async def cb_change_pro(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Switch to pro mode — show template categories."""
query = update.callback_query
await query.answer()
# Reuse the pro mode template selection flow
await cb_install_mode_pro(update, context)
async def cb_install_migrate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Migrate from v1 (mtg Docker) to v2 (telemt)."""
query = update.callback_query
await query.answer()
await safe_edit_message(query,"⏳ Migrating from v1...")
# Stop old mtg container
code, _, stderr = await sh("docker", "stop", "mtproto-proxy", timeout=30)
if code != 0:
code, _, stderr = await sh("docker", "stop", "mtg", timeout=30)
# Remove old container
await sh("docker", "rm", "mtproto-proxy", timeout=15)
await sh("docker", "rm", "mtg", timeout=15)
text = (
"✅ <b>v1 container stopped and removed</b>\n\n"
"Now select installation mode for v2:"
)
keyboard = get_install_mode_menu()
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
# ============================================================================
# WEBSITE & SSL
# ============================================================================
async def cb_menu_website(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Website and SSL management."""
query = update.callback_query
await query.answer()
buttons = [
[InlineKeyboardButton("🔄 Renew SSL Certificate", callback_data="ssl_renew")],
[InlineKeyboardButton("📊 SSL Status", callback_data="ssl_status")],
[InlineKeyboardButton("« Back", callback_data="menu_main")],
]
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,
"Website & SSL Management:", reply_markup=keyboard
)
async def cb_ssl_renew(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Renew SSL certificate."""
query = update.callback_query
await query.answer()
await safe_edit_message(query,"⏳ Renewing SSL certificate...")
code, stdout, stderr = await sh("certbot", "renew", timeout=120)
if code == 0:
text = "✅ SSL certificate renewed successfully"
else:
text = f"❌ Renewal failed:\n<code>{html.escape(stderr[:500])}</code>"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_website")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_ssl_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show SSL status."""
query = update.callback_query
await query.answer()
code, stdout, _ = await sh("certbot", "certificates")
if code == 0:
text = f"<b>📊 SSL Certificates</b>\n\n<pre>{html.escape(stdout[:1000])}</pre>"
else:
text = "❌ Failed to get SSL status"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_website")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
# ============================================================================
# ADMIN MANAGEMENT
# ============================================================================
async def cb_menu_admins(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Показать список админов и кнопки управления."""
query = update.callback_query
await query.answer()
if ALLOWED_IDS:
ids_list = "\n".join(f" • <code>{uid}</code>" for uid in sorted(ALLOWED_IDS))
text = f"<b>👤 Администраторы</b>\n\n{ids_list}\n"
else:
text = "<b>👤 Администраторы</b>\n\n<i>Список пуст — доступ для всех</i>\n"
text += (
f"\nВсего: {len(ALLOWED_IDS)}\n\n"
"Чтобы <b>добавить</b> — перешлите любое сообщение от нового админа, "
"или отправьте команду:\n"
"<code>/addadmin 123456789</code>\n\n"
"Чтобы <b>удалить</b>:\n"
"<code>/deladmin 123456789</code>"
)
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("« Назад", callback_data="menu_main")],
])
await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML")
async def cmd_addadmin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/addadmin ID [ID2 ID3 ...] — добавить админа вручную."""
if not is_user_allowed(update.effective_user.id):
await update.message.reply_text(
f"⛔ Доступ запрещён.\nВаш ID: <code>{update.effective_user.id}</code>",
parse_mode="HTML",
)
return
args = context.args or []
if not args:
await update.message.reply_text(
"Использование: <code>/addadmin ID [ID2 ID3 ...]</code>\n"
"Пример: <code>/addadmin 123456789 987654321</code>",
parse_mode="HTML",
)
return
added = []
errors = []
for a in args:
a = a.strip().replace(",", "")
if not a:
continue
try:
uid = int(a)
add_admin(uid)
added.append(str(uid))
except ValueError:
errors.append(a)
parts = []
if added:
parts.append(f"✅ Добавлены: {', '.join(added)}")
if errors:
parts.append(f"❌ Ошибки: {', '.join(errors)}")
await update.message.reply_text("\n".join(parts), parse_mode="HTML")
async def cmd_deladmin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/deladmin ID — удалить админа."""
if not is_user_allowed(update.effective_user.id):
await update.message.reply_text(
f"⛔ Доступ запрещён.\nВаш ID: <code>{update.effective_user.id}</code>",
parse_mode="HTML",
)
return
args = context.args or []
if not args:
await update.message.reply_text(
"Использование: <code>/deladmin ID</code>",
parse_mode="HTML",
)
return
removed = []
for a in args:
a = a.strip().replace(",", "")
try:
uid = int(a)
if uid == update.effective_user.id:
await update.message.reply_text("⚠️ Нельзя удалить себя!")
continue
if uid in ALLOWED_IDS:
remove_admin(uid)
removed.append(str(uid))
else:
await update.message.reply_text(f"ID {uid} не найден в списке")
except ValueError:
await update.message.reply_text(f"❌ Некорректный ID: {html.escape(a)}")
if removed:
await update.message.reply_text(f"✅ Удалены: {', '.join(removed)}")
# ============================================================================
# PROMO & CREDITS
# ============================================================================
def get_promo_text() -> str:
"""Return promo text with 2 hosters + donate."""
return (
"<b>💰 Хостинг #1 — скидка до 60%</b>\n"
f"<a href='{PROMO_LINK_1}'>{PROMO_LINK_1}</a>\n\n"
"<b>Промокоды:</b>\n"
" <code>OFF60</code> — 60% на первый месяц\n"
" <code>antenka20</code> — 20% + 3% за 3 мес\n"
" <code>antenka6</code> — 15% + 5% за 6 мес\n\n"
"━━━━━━━━━━━━━━━━━━━━━━━━━\n\n"
"<b>💰 Хостинг #2 — скидка до 60%</b>\n"
f"<a href='{PROMO_LINK_2}'>{PROMO_LINK_2}</a>\n\n"
"<b>Промокод:</b>\n"
" <code>OFF60</code> — 60% на первый месяц\n\n"
"━━━━━━━━━━━━━━━━━━━━━━━━━\n\n"
"<b>☕ Донат / Чаевые</b>\n"
f"<a href='{TIP_LINK}'>{TIP_LINK}</a>"
)
def should_show_promo_bot() -> bool:
"""Check if promo should be shown (once per 24h)."""
try:
if not os.path.exists(PROMO_STAMP_FILE):
return True
with open(PROMO_STAMP_FILE, "r") as f:
last_ts = int(f.read().strip())
return (int(time.time()) - last_ts) >= 86400
except (ValueError, OSError):
return True
def mark_promo_shown_bot() -> None:
"""Mark promo as shown."""
try:
os.makedirs(os.path.dirname(PROMO_STAMP_FILE), exist_ok=True)
with open(PROMO_STAMP_FILE, "w") as f:
f.write(str(int(time.time())))
except OSError:
pass
async def cb_menu_promo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Promo information — always shown from menu."""
query = update.callback_query
await query.answer()
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Назад", callback_data="menu_main")]]
)
await safe_edit_message(query, get_promo_text(), reply_markup=keyboard, parse_mode="HTML")
async def cb_menu_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Credits and acknowledgements."""
query = update.callback_query
await query.answer()
text = (
f"<b> Credits & Acknowledgements</b>\n\n"
f"<b>GoTelegram v{GOTELEGRAM_VERSION}</b>\n\n"
f"Built with love for the Telegram community\n\n"
f"<b>Special thanks to:</b>\n\n"
f"🙏 <b>telemt</b> - MTProxy engine\n"
f" High-performance proxy core\n\n"
f"🎨 <b>HTML5UP</b> - Beautiful web templates\n"
f" Responsive design & themes\n\n"
f"📚 <b>Learning Zone</b> - Educational resources\n"
f" Community learning support\n\n"
f"🚀 <b>Start Bootstrap</b> - Bootstrap templates\n"
f" Professional design framework\n\n"
f"💬 <b>Community</b> - Your feedback & support\n\n"
f"<i>GoTelegram is open-source and community-driven</i>"
)
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
# ============================================================================
# REMOVE
# ============================================================================
async def cb_menu_remove(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Remove installation."""
query = update.callback_query
await query.answer()
text = (
"<b>⚠️ Remove GoTelegram</b>\n\n"
"This will completely remove the installation.\n"
"Are you sure?"
)
buttons = [
[InlineKeyboardButton("❌ Yes, Remove", callback_data="remove_confirm")],
[InlineKeyboardButton("« Back", callback_data="menu_main")],
]
keyboard = InlineKeyboardMarkup(buttons)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
async def cb_remove_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Confirm removal."""
query = update.callback_query
await query.answer()
await safe_edit_message(query,"⏳ Removing GoTelegram...")
# Stop service
await sh("systemctl", "stop", TELEMT_SERVICE)
# Remove directories
for path in ["/opt/gotelegram", WEBSITE_ROOT]:
await sh("rm", "-rf", path)
text = "✅ GoTelegram removed successfully"
keyboard = InlineKeyboardMarkup(
[[InlineKeyboardButton("« Back", callback_data="menu_main")]]
)
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
# ============================================================================
# CALLBACK ROUTING
# ============================================================================
async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Route all callbacks."""
query = update.callback_query
data = query.data
# ── Авто-регистрация админа (до проверки доступа!) ──
if data.startswith("admin_confirm_"):
await query.answer()
try:
new_admin_id = int(data.split("_")[-1])
except (ValueError, IndexError):
await safe_edit_message(query, "❌ Ошибка: некорректный ID")
return
# Безопасность: только тот кто нажал кнопку может стать админом
if update.effective_user.id != new_admin_id:
await query.answer("Эта кнопка не для вас", show_alert=True)
return
# Race condition: если кто-то уже стал админом
if not _WAITING_FOR_ADMIN:
await safe_edit_message(query, " Администратор уже назначен.")
return
add_admin(new_admin_id)
await safe_edit_message(
query,
f"✅ <b>Вы назначены администратором!</b>\n\n"
f"ID: <code>{new_admin_id}</code>\n\n"
f"Нажмите /start чтобы открыть меню.",
parse_mode="HTML",
)
return
if data == "admin_cancel":
await query.answer()
await safe_edit_message(
query,
"👋 Ок. Напишите /start когда будете готовы.",
)
return
# Access control
if not is_user_allowed(update.effective_user.id):
await query.answer("Доступ запрещён")
return
# Main menu
if data == "menu_main":
await query.answer()
buttons = get_main_menu()
text = (
f"<b>GoTelegram v{GOTELEGRAM_VERSION}</b>\n\n"
"🤖 MTProxy Management\n"
"Select an action:"
)
await safe_edit_message(query,text, reply_markup=buttons, parse_mode="HTML")
return
if data == "close_menu":
await query.answer()
await query.delete_message()
return
# Dispatch to handlers
handlers = {
"menu_install": cb_menu_install,
"menu_status": cb_menu_status,
"menu_link": cb_menu_link,
"menu_share": cb_menu_share,
"menu_restart": cb_menu_restart,
"menu_logs": cb_menu_logs,
"menu_backup": cb_menu_backup,
"menu_restore": cb_menu_restore,
"menu_update": cb_menu_update,
"menu_change": cb_menu_change,
"menu_website": cb_menu_website,
"menu_promo": cb_menu_promo,
"menu_credits": cb_menu_credits,
"menu_admins": cb_menu_admins,
"menu_remove": cb_menu_remove,
"install_mode_lite": cb_install_mode_lite,
"install_mode_pro": cb_install_mode_pro,
"backup_create": cb_backup_create,
"backup_list": cb_backup_list,
"ssl_renew": cb_ssl_renew,
"ssl_status": cb_ssl_status,
"remove_confirm": cb_remove_confirm,
"change_lite": cb_change_lite,
"change_pro": cb_change_pro,
"install_migrate": cb_install_migrate,
"menu_stats": cb_menu_stats,
}
# Pattern-based handlers
if data.startswith("lite_dom_"):
await cb_lite_domain(update, context)
elif data.startswith("pro_cat_"):
await cb_pro_category(update, context)
elif data.startswith("pro_tpl_"):
await cb_pro_template(update, context)
elif data.startswith("pro_confirm_"):
await cb_pro_confirm(update, context)
elif data.startswith("restore_idx_"):
await cb_restore_backup(update, context)
elif data in handlers:
await handlers[data](update, context)
else:
await query.answer("Unknown action")
# ============================================================================
# ERROR HANDLERS
# ============================================================================
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log errors caused by Updates."""
logger.error(f"Exception while handling an update:", exc_info=context.error)
# ============================================================================
# MAIN APPLICATION
# ============================================================================
def main() -> None:
"""Start the bot."""
if not BOT_TOKEN:
logger.error("BOT_TOKEN not set in .env file")
return
# Create the Application
application = Application.builder().token(BOT_TOKEN).build()
# Command handlers
application.add_handler(CommandHandler("start", cmd_start))
application.add_handler(CommandHandler("help", cmd_help))
application.add_handler(CommandHandler("status", cmd_status))
application.add_handler(CommandHandler("logs", cmd_logs))
application.add_handler(CommandHandler("addadmin", cmd_addadmin))
application.add_handler(CommandHandler("deladmin", cmd_deladmin))
# Callback query handler (buttons)
application.add_handler(CallbackQueryHandler(handle_callback))
# Error handler
application.add_error_handler(error_handler)
# Run the bot
logger.info(f"GoTelegram v{GOTELEGRAM_VERSION} bot starting...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()