mirror of
https://github.com/anten-ka/gotelegram_pro.git
synced 2026-05-19 13:26:02 +00:00
2351 lines
84 KiB
Python
2351 lines
84 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
GoTelegram v2.4 Bot - MTProxy Management for Linux
|
||
Manages telemt engine via Telegram interface with full CLI feature parity
|
||
Uses python-telegram-bot v21+
|
||
Supports EN/RU UI with per-user language preferences.
|
||
"""
|
||
|
||
import asyncio
|
||
import csv
|
||
import hashlib
|
||
import html
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import toml
|
||
from datetime import datetime
|
||
from io import StringIO
|
||
from pathlib import Path
|
||
from typing import Tuple, Optional, List, Dict, Any
|
||
|
||
from dotenv import load_dotenv
|
||
from telegram import (
|
||
Update,
|
||
InlineKeyboardButton,
|
||
InlineKeyboardMarkup,
|
||
InputFile,
|
||
)
|
||
from telegram.ext import (
|
||
Application,
|
||
CommandHandler,
|
||
CallbackQueryHandler,
|
||
ContextTypes,
|
||
MessageHandler,
|
||
filters,
|
||
)
|
||
from telegram.error import TelegramError, BadRequest
|
||
|
||
# i18n — loaded from the bot directory next to this file
|
||
_BOT_DIR = Path(__file__).resolve().parent
|
||
if str(_BOT_DIR) not in sys.path:
|
||
sys.path.insert(0, str(_BOT_DIR))
|
||
try:
|
||
from i18n import (
|
||
t as _t,
|
||
tf as _tf,
|
||
get_user_lang,
|
||
set_user_lang,
|
||
get_language_name,
|
||
SUPPORTED_LANGS,
|
||
)
|
||
except Exception as _i18n_err: # pragma: no cover — defensive fallback
|
||
logging.warning("i18n module not available: %s", _i18n_err)
|
||
|
||
def _t(user_id, key, default=None):
|
||
return default if default is not None else key
|
||
|
||
def _tf(user_id, key, *args, default=None):
|
||
template = default if default is not None else key
|
||
try:
|
||
return template % args if args else template
|
||
except Exception:
|
||
return template
|
||
|
||
def get_user_lang(user_id):
|
||
return "en"
|
||
|
||
def set_user_lang(user_id, code):
|
||
return False
|
||
|
||
def get_language_name(code):
|
||
return code
|
||
|
||
SUPPORTED_LANGS = ("en",)
|
||
|
||
|
||
def _uid(update: Optional[Update]) -> Optional[int]:
|
||
"""Extract user id from an update (if any)."""
|
||
if update is None:
|
||
return None
|
||
user = getattr(update, "effective_user", None)
|
||
return user.id if user else None
|
||
|
||
# Load environment variables
|
||
load_dotenv()
|
||
|
||
# Logging configuration
|
||
logging.basicConfig(
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
level=logging.INFO,
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ============================================================================
|
||
# CONFIGURATION
|
||
# ============================================================================
|
||
|
||
GOTELEGRAM_VERSION = "2.4.7"
|
||
GOTELEGRAM_CONFIG = "/opt/gotelegram/config.json"
|
||
TELEMT_CONFIG = "/etc/telemt/config.toml"
|
||
TELEMT_SERVICE = "telemt"
|
||
WEBSITE_ROOT = "/var/www/gotelegram-site"
|
||
BACKUP_DIR = "/opt/gotelegram/backups"
|
||
TEMPLATES_CATALOG = "/opt/gotelegram/templates_catalog.json"
|
||
INSTALL_SH = "/opt/gotelegram/install.sh"
|
||
|
||
PROMO_LINK_1 = "https://vk.cc/ct29NQ"
|
||
PROMO_LINK_2 = "https://vk.cc/cUxAhj"
|
||
TIP_LINK = "https://pay.cloudtips.ru/p/7410814f"
|
||
PROMO_STAMP_FILE = "/opt/gotelegram/.promo_bot_last_shown"
|
||
|
||
BOT_TOKEN = os.getenv("BOT_TOKEN")
|
||
ENV_FILE = "/opt/gotelegram-bot/.env"
|
||
|
||
# ── Загрузка ALLOWED_IDS ────────────────────────────────────────────────────
|
||
# Поддерживает запятую, пробел, или их комбинацию как разделитель
|
||
ALLOWED_IDS: set = set()
|
||
_WAITING_FOR_ADMIN = False # True если список пуст → ждём первого админа
|
||
|
||
|
||
def _load_allowed_ids() -> None:
|
||
"""Загрузить ALLOWED_IDS из переменной окружения."""
|
||
global ALLOWED_IDS, _WAITING_FOR_ADMIN
|
||
raw = os.getenv("ALLOWED_IDS", "")
|
||
ALLOWED_IDS = set()
|
||
# Разделители: запятая, пробел, или оба
|
||
for part in re.split(r'[,\s]+', raw):
|
||
part = part.strip()
|
||
if part:
|
||
try:
|
||
ALLOWED_IDS.add(int(part))
|
||
except ValueError:
|
||
logging.warning(f"Invalid ALLOWED_IDS entry: {part}")
|
||
_WAITING_FOR_ADMIN = len(ALLOWED_IDS) == 0
|
||
|
||
|
||
def _save_allowed_ids() -> None:
|
||
"""Сохранить ALLOWED_IDS в .env файл и обновить os.environ."""
|
||
global _WAITING_FOR_ADMIN
|
||
ids_str = ",".join(str(i) for i in sorted(ALLOWED_IDS))
|
||
os.environ["ALLOWED_IDS"] = ids_str
|
||
_WAITING_FOR_ADMIN = len(ALLOWED_IDS) == 0
|
||
|
||
if not os.path.exists(ENV_FILE):
|
||
return
|
||
|
||
try:
|
||
with open(ENV_FILE, "r") as f:
|
||
lines = f.readlines()
|
||
|
||
found = False
|
||
new_lines = []
|
||
for line in lines:
|
||
if line.strip().startswith("ALLOWED_IDS="):
|
||
if ids_str:
|
||
new_lines.append(f"ALLOWED_IDS={ids_str}\n")
|
||
# Если пусто — удаляем строку
|
||
found = True
|
||
else:
|
||
new_lines.append(line)
|
||
|
||
if not found and ids_str:
|
||
new_lines.append(f"ALLOWED_IDS={ids_str}\n")
|
||
|
||
with open(ENV_FILE, "w") as f:
|
||
f.writelines(new_lines)
|
||
|
||
logger.info(f"ALLOWED_IDS updated in .env: {ids_str or '(empty)'}")
|
||
except OSError as e:
|
||
logger.error(f"Failed to update .env: {e}")
|
||
|
||
|
||
_load_allowed_ids()
|
||
|
||
LITE_DOMAINS = [
|
||
"google.com",
|
||
"microsoft.com",
|
||
"cloudflare.com",
|
||
"apple.com",
|
||
"amazon.com",
|
||
"github.com",
|
||
"stackoverflow.com",
|
||
"medium.com",
|
||
"wikipedia.org",
|
||
"coursera.org",
|
||
"udemy.com",
|
||
"habr.com",
|
||
"stepik.org",
|
||
"duolingo.com",
|
||
"khanacademy.org",
|
||
"bbc.com",
|
||
"reuters.com",
|
||
"nytimes.com",
|
||
"ted.com",
|
||
"zoom.us",
|
||
]
|
||
|
||
# ============================================================================
|
||
# UTILITY FUNCTIONS
|
||
# ============================================================================
|
||
|
||
|
||
async def sh(*args, timeout: int = 60) -> Tuple[int, str, str]:
|
||
"""Execute shell command asynchronously.
|
||
|
||
Args:
|
||
*args: Command and arguments
|
||
timeout: Timeout in seconds
|
||
|
||
Returns:
|
||
Tuple of (return_code, stdout, stderr)
|
||
"""
|
||
try:
|
||
process = await asyncio.create_subprocess_exec(
|
||
*args,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(), timeout=timeout
|
||
)
|
||
return (
|
||
process.returncode,
|
||
stdout.decode("utf-8", errors="replace"),
|
||
stderr.decode("utf-8", errors="replace"),
|
||
)
|
||
except asyncio.TimeoutError:
|
||
try:
|
||
process.kill()
|
||
await process.wait()
|
||
except Exception:
|
||
pass
|
||
return (-1, "", f"Command timeout after {timeout}s")
|
||
except Exception as e:
|
||
return (-1, "", str(e))
|
||
|
||
|
||
# Per-host mutex preventing concurrent install.sh --action invocations. Two
|
||
# admins hitting "change template" at the same second could race each other
|
||
# and corrupt /var/www/gotelegram-site. One global lock is fine — these are
|
||
# rare operations and should serialize cleanly.
|
||
_BOT_ACTION_LOCK = asyncio.Lock()
|
||
|
||
# Allowed template-id shape: catalog ids are [a-zA-Z0-9_-], never longer than 64.
|
||
# This is a defense-in-depth check before we hand the value to subprocess.
|
||
_TPL_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
|
||
|
||
# Allowed Lite mask domain shape — simple DNS hostname, up to 253 chars total.
|
||
# Each label 1–63 chars, labels separated by dots, alphanumerics + hyphens.
|
||
_DOMAIN_RE = re.compile(
|
||
r"^(?=.{1,253}$)(?:(?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+"
|
||
r"(?!-)[A-Za-z0-9-]{2,63}(?<!-)$"
|
||
)
|
||
|
||
|
||
async def run_bot_action(action: str, timeout: int = 300, **params) -> Dict:
|
||
"""Invoke install.sh --action=X --json and parse the JSON result.
|
||
|
||
Args:
|
||
action: action name (e.g. "change-template", "change-lite-domain")
|
||
timeout: seconds to wait for completion (long ops: template download can take time)
|
||
**params: arbitrary key→value pairs, each passed as --key=value
|
||
|
||
Returns:
|
||
dict with at least {"status": "success|error", "message": "..."}.
|
||
Transport errors are mapped to {"status":"error","message":..., "code":"transport"}
|
||
"""
|
||
cmd = ["bash", INSTALL_SH, f"--action={action}", "--json"]
|
||
for k, v in params.items():
|
||
if v is None:
|
||
continue
|
||
cmd.append(f"--{k.replace('_', '-')}={v}")
|
||
|
||
code, stdout, stderr = await sh(*cmd, timeout=timeout)
|
||
stdout = (stdout or "").strip()
|
||
|
||
# install.sh may print multiple log lines to stderr; the JSON is on stdout.
|
||
# Pick the last non-empty line that looks like JSON (robust to any stray output).
|
||
json_line = None
|
||
for line in reversed(stdout.splitlines()):
|
||
line = line.strip()
|
||
if line.startswith("{") and line.endswith("}"):
|
||
json_line = line
|
||
break
|
||
|
||
if json_line:
|
||
try:
|
||
data = json.loads(json_line)
|
||
if isinstance(data, dict) and "status" in data:
|
||
return data
|
||
except json.JSONDecodeError as e:
|
||
logger.warning(f"run_bot_action: JSON parse failed: {e} | line={json_line!r}")
|
||
|
||
# No JSON from install.sh — synthesize an error result
|
||
tail = (stderr or "")[-300:] if stderr else ""
|
||
logger.error(
|
||
f"run_bot_action({action}): no JSON output, rc={code}, "
|
||
f"stdout={stdout[-300:]!r}, stderr={tail!r}"
|
||
)
|
||
return {
|
||
"status": "error",
|
||
"message": "install.sh did not return a JSON result",
|
||
"code": "transport",
|
||
"rc": str(code),
|
||
}
|
||
|
||
|
||
def load_json(path: str) -> Optional[Dict]:
|
||
"""Load JSON file."""
|
||
try:
|
||
with open(path, "r") as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load {path}: {e}")
|
||
return None
|
||
|
||
|
||
def load_toml(path: str) -> Optional[Dict]:
|
||
"""Load TOML file."""
|
||
try:
|
||
with open(path, "r") as f:
|
||
return toml.load(f)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load {path}: {e}")
|
||
return None
|
||
|
||
|
||
def save_json(path: str, data: Dict) -> bool:
|
||
"""Save JSON file."""
|
||
try:
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
with open(path, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to save {path}: {e}")
|
||
return False
|
||
|
||
|
||
async def safe_edit_message(
|
||
query,
|
||
text: str,
|
||
reply_markup=None,
|
||
parse_mode=None,
|
||
disable_web_page_preview: Optional[bool] = None,
|
||
) -> bool:
|
||
"""Safely edit message, handling cases where message was deleted or not modified.
|
||
|
||
`disable_web_page_preview` is forwarded to edit_message_text when set; omitting
|
||
it keeps Telegram's default (enabled).
|
||
"""
|
||
kwargs = {"reply_markup": reply_markup, "parse_mode": parse_mode}
|
||
if disable_web_page_preview is not None:
|
||
kwargs["disable_web_page_preview"] = disable_web_page_preview
|
||
try:
|
||
await query.edit_message_text(text, **kwargs)
|
||
return True
|
||
except BadRequest as e:
|
||
err_msg = str(e).lower()
|
||
if "message is not modified" in err_msg:
|
||
return True # No change needed, not an error
|
||
if "message to edit not found" in err_msg or "message can't be edited" in err_msg:
|
||
logger.warning(f"Cannot edit message: {e}")
|
||
return False
|
||
raise # Re-raise unexpected BadRequest
|
||
|
||
|
||
async def _delete_message_after(message, delay: int = 30) -> None:
|
||
"""Delete a Telegram message after `delay` seconds. Errors are swallowed
|
||
(message may already be deleted by the user). Used for ephemeral content
|
||
like promo blocks that should auto-cleanup."""
|
||
try:
|
||
await asyncio.sleep(delay)
|
||
await message.delete()
|
||
except Exception as e:
|
||
logger.debug(f"_delete_message_after: {e}")
|
||
|
||
|
||
async def check_service_status(service: str) -> bool:
|
||
"""Check if systemd service is running."""
|
||
code, _, _ = await sh("systemctl", "is-active", service)
|
||
return code == 0
|
||
|
||
|
||
async def get_telemt_version() -> str:
|
||
"""Get telemt version."""
|
||
code, stdout, _ = await sh("telemt", "-v")
|
||
if code == 0:
|
||
return stdout.strip().split()[-1] if stdout else "unknown"
|
||
return "unknown"
|
||
|
||
|
||
def is_docker_running() -> bool:
|
||
"""Check if Docker daemon is running."""
|
||
try:
|
||
subprocess.run(
|
||
["docker", "ps"],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=5,
|
||
)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
async def check_old_container() -> Optional[str]:
|
||
"""Check for old mtg Docker container (v1 migration)."""
|
||
if not is_docker_running():
|
||
return None
|
||
code, stdout, _ = await sh("docker", "ps", "-a", "--format", "{{.Names}}")
|
||
if code == 0 and "mtg" in stdout:
|
||
return "mtg"
|
||
return None
|
||
|
||
|
||
# ============================================================================
|
||
# ACCESS CONTROL
|
||
# ============================================================================
|
||
|
||
|
||
def is_user_allowed(user_id: int) -> bool:
|
||
"""Check if user ID is in ALLOWED_IDS. If list is empty — waiting for admin."""
|
||
if _WAITING_FOR_ADMIN:
|
||
return False # Никому не даём доступ пока не назначен админ
|
||
return user_id in ALLOWED_IDS
|
||
|
||
|
||
def add_admin(user_id: int) -> None:
|
||
"""Добавить администратора и сохранить в .env."""
|
||
ALLOWED_IDS.add(user_id)
|
||
_save_allowed_ids()
|
||
logger.info(f"Admin added: {user_id}")
|
||
|
||
|
||
def remove_admin(user_id: int) -> None:
|
||
"""Убрать администратора и сохранить в .env."""
|
||
ALLOWED_IDS.discard(user_id)
|
||
_save_allowed_ids()
|
||
logger.info(f"Admin removed: {user_id}")
|
||
|
||
|
||
async def require_auth(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
|
||
"""Check authorization and send error if not allowed."""
|
||
user_id = update.effective_user.id
|
||
|
||
# Режим ожидания первого админа — обрабатывается в cmd_start
|
||
if _WAITING_FOR_ADMIN:
|
||
return False
|
||
|
||
if not is_user_allowed(user_id):
|
||
if update.message:
|
||
await update.message.reply_text(
|
||
f"⛔ Доступ запрещён.\nВаш ID: <code>{user_id}</code>",
|
||
parse_mode="HTML",
|
||
)
|
||
logger.warning(f"Unauthorized access attempt from user {user_id}")
|
||
return False
|
||
return True
|
||
|
||
|
||
# ============================================================================
|
||
# MAIN MENU
|
||
# ============================================================================
|
||
|
||
|
||
def get_main_menu(user_id: Optional[int] = None) -> InlineKeyboardMarkup:
|
||
"""Generate main menu keyboard localized for the given user."""
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_install"), callback_data="menu_install"),
|
||
InlineKeyboardButton(_t(user_id, "menu_status"), callback_data="menu_status"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_link"), callback_data="menu_link"),
|
||
InlineKeyboardButton(_t(user_id, "menu_share"), callback_data="menu_share"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_restart"), callback_data="menu_restart"),
|
||
InlineKeyboardButton(_t(user_id, "menu_logs"), callback_data="menu_logs"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_change"), callback_data="menu_change"),
|
||
InlineKeyboardButton(_t(user_id, "menu_backup"), callback_data="menu_backup"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_restore"), callback_data="menu_restore"),
|
||
InlineKeyboardButton(_t(user_id, "menu_update"), callback_data="menu_update"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_website"), callback_data="menu_website"),
|
||
InlineKeyboardButton(_t(user_id, "menu_promo"), callback_data="menu_promo"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_stats"), callback_data="menu_stats"),
|
||
InlineKeyboardButton(_t(user_id, "menu_remove"), callback_data="menu_remove"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_admins"), callback_data="menu_admins"),
|
||
InlineKeyboardButton(_t(user_id, "menu_credits"), callback_data="menu_credits"),
|
||
],
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "menu_language"), callback_data="menu_lang"),
|
||
InlineKeyboardButton(_t(user_id, "menu_close"), callback_data="close_menu"),
|
||
],
|
||
]
|
||
return InlineKeyboardMarkup(buttons)
|
||
|
||
|
||
# ============================================================================
|
||
# COMMANDS
|
||
# ============================================================================
|
||
|
||
|
||
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Start command - show main menu, promo once per day.
|
||
|
||
Если ALLOWED_IDS пуст — режим авто-регистрации первого админа.
|
||
"""
|
||
user = update.effective_user
|
||
user_id = user.id
|
||
|
||
# ── Режим ожидания первого админа ──
|
||
if _WAITING_FOR_ADMIN:
|
||
name = user.full_name or user.username or str(user_id)
|
||
title = _tf(user_id, "waiting_admin_title", html.escape(name))
|
||
body = _tf(user_id, "waiting_admin_body", user_id)
|
||
text = f"<b>{title}</b>\n\n{body}"
|
||
keyboard = InlineKeyboardMarkup([
|
||
[
|
||
InlineKeyboardButton(_t(user_id, "btn_yes"), callback_data=f"admin_confirm_{user_id}"),
|
||
InlineKeyboardButton(_t(user_id, "btn_no"), callback_data="admin_cancel"),
|
||
]
|
||
])
|
||
await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML")
|
||
return
|
||
|
||
# ── Проверка доступа ──
|
||
if not is_user_allowed(user_id):
|
||
await update.message.reply_text(
|
||
_tf(user_id, "access_denied", user_id),
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
welcome = (
|
||
f"<b>{_tf(user_id, 'welcome_title', GOTELEGRAM_VERSION)}</b>\n\n"
|
||
f"{_t(user_id, 'welcome_subtitle')}\n"
|
||
f"{_t(user_id, 'welcome_powered')}\n\n"
|
||
f"{_t(user_id, 'welcome_prompt')}"
|
||
)
|
||
await update.message.reply_text(
|
||
welcome, reply_markup=get_main_menu(user_id), parse_mode="HTML"
|
||
)
|
||
|
||
# Промо раз в сутки — сообщение само удаляется через 30 секунд
|
||
if should_show_promo_bot():
|
||
mark_promo_shown_bot()
|
||
promo_msg = await update.message.reply_text(
|
||
get_promo_text(), parse_mode="HTML", disable_web_page_preview=True
|
||
)
|
||
asyncio.create_task(_delete_message_after(promo_msg, 30))
|
||
|
||
|
||
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Help command - show available commands."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
user_id = _uid(update)
|
||
help_text = (
|
||
f"<b>{_t(user_id, 'help_title')}</b>\n\n"
|
||
f"{_t(user_id, 'help_lines')}"
|
||
)
|
||
await update.message.reply_text(help_text, parse_mode="HTML")
|
||
|
||
|
||
async def cmd_lang(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show language picker."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
user_id = _uid(update)
|
||
current = get_user_lang(user_id)
|
||
title = _t(user_id, "lang_title")
|
||
curr_line = _tf(user_id, "lang_current", get_language_name(current))
|
||
prompt = _t(user_id, "lang_choose")
|
||
text = f"<b>{title}</b>\n\n{curr_line}\n\n{prompt}"
|
||
keyboard = InlineKeyboardMarkup([
|
||
[
|
||
InlineKeyboardButton("🇬🇧 English", callback_data="lang_set_en"),
|
||
InlineKeyboardButton("🇷🇺 Русский", callback_data="lang_set_ru"),
|
||
],
|
||
[InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_main")],
|
||
])
|
||
await update.message.reply_text(text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Quick status check."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
user_id = _uid(update)
|
||
await update.message.reply_text(_t(user_id, "status_checking"), parse_mode="HTML")
|
||
status_text = await get_status_text(user_id)
|
||
await update.message.reply_text(status_text, parse_mode="HTML")
|
||
|
||
|
||
async def cmd_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show recent logs."""
|
||
if not await require_auth(update, context):
|
||
return
|
||
user_id = _uid(update)
|
||
|
||
code, stdout, stderr = await sh(
|
||
"journalctl", "-u", TELEMT_SERVICE, "-n", "20", "--no-pager"
|
||
)
|
||
if code == 0:
|
||
log_text = stdout[-1500:] if len(stdout) > 1500 else stdout
|
||
await update.message.reply_text(
|
||
f"<pre>{html.escape(log_text)}</pre>",
|
||
parse_mode="HTML",
|
||
)
|
||
else:
|
||
await update.message.reply_text(_t(user_id, "logs_failed"))
|
||
|
||
|
||
# ============================================================================
|
||
# STATUS
|
||
# ============================================================================
|
||
|
||
|
||
async def get_status_text(user_id: Optional[int] = None) -> str:
|
||
"""Generate status report (localized)."""
|
||
lines = [f"<b>{_t(user_id, 'status_title')}</b>\n"]
|
||
|
||
# Service status
|
||
is_running = await check_service_status(TELEMT_SERVICE)
|
||
running = _t(user_id, "status_running") if is_running else _t(user_id, "status_stopped")
|
||
lines.append(f"<b>{_t(user_id, 'status_service')}:</b> {running}")
|
||
|
||
# Telemt version
|
||
version = await get_telemt_version()
|
||
lines.append(f"<b>{_t(user_id, 'status_telemt')}:</b> v{version}")
|
||
|
||
# Config status
|
||
config = load_json(GOTELEGRAM_CONFIG)
|
||
if config:
|
||
lines.append(f"<b>{_t(user_id, 'status_mode')}:</b> {html.escape(str(config.get('mode', 'unknown')))}")
|
||
# install.sh/save_gotelegram_config uses "template_id" (not "template")
|
||
tpl = config.get("template_id") or config.get("template")
|
||
if tpl:
|
||
lines.append(f"<b>{_t(user_id, 'status_template')}:</b> {html.escape(str(tpl))}")
|
||
if config.get("domain"):
|
||
lines.append(f"<b>{_t(user_id, 'status_domain')}:</b> {html.escape(str(config['domain']))}")
|
||
if config.get("port"):
|
||
lines.append(f"<b>{_t(user_id, 'status_port')}:</b> {html.escape(str(config['port']))}")
|
||
|
||
# Telemt config (v3: [server] port = ..., [censorship] tls_domain = ...)
|
||
telemt_cfg = load_toml(TELEMT_CONFIG)
|
||
if telemt_cfg:
|
||
server_cfg = telemt_cfg.get("server", {})
|
||
if "port" in server_cfg:
|
||
lines.append(f"<b>{_t(user_id, 'status_listen_port')}:</b> {server_cfg['port']}")
|
||
censor_cfg = telemt_cfg.get("censorship", {})
|
||
if "tls_domain" in censor_cfg:
|
||
lines.append(f"<b>{_t(user_id, 'status_tls_domain')}:</b> {html.escape(str(censor_cfg['tls_domain']))}")
|
||
|
||
# Backups
|
||
backup_count = 0
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backup_count = len([f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")])
|
||
except Exception:
|
||
pass
|
||
lines.append(f"<b>Backups:</b> {backup_count}")
|
||
|
||
# Old container check
|
||
old_container = await check_old_container()
|
||
if old_container:
|
||
lines.append(f"\n⚠️ <b>Found old container:</b> {html.escape(old_container)}")
|
||
lines.append("Run 'Install' to migrate")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def get_traffic_stats() -> str:
|
||
"""Get formatted traffic statistics."""
|
||
# Read current snapshot
|
||
current_file = "/run/gotelegram/stats_current.json"
|
||
history_file = "/opt/gotelegram/stats_history.csv"
|
||
|
||
try:
|
||
with open(current_file, "r") as f:
|
||
current = json.load(f)
|
||
except Exception:
|
||
return "📊 <b>Статистика</b>\n\n<i>Данные недоступны. Убедитесь что модуль статистики включён.</i>"
|
||
|
||
# Read history
|
||
history = []
|
||
try:
|
||
with open(history_file, "r") as f:
|
||
reader = csv.reader(f)
|
||
for row in reader:
|
||
if len(row) >= 3:
|
||
history.append({
|
||
"ts": int(row[0]),
|
||
"proxy": int(row[1]),
|
||
"site": int(row[2]),
|
||
})
|
||
except Exception:
|
||
pass
|
||
|
||
now = int(time.time())
|
||
|
||
def format_bytes(b):
|
||
if b < 1024:
|
||
return f"{b} B"
|
||
if b < 1048576:
|
||
return f"{b/1024:.1f} KB"
|
||
if b < 1073741824:
|
||
return f"{b/1048576:.1f} MB"
|
||
return f"{b/1073741824:.1f} GB"
|
||
|
||
def format_rate(bps):
|
||
if bps < 1024:
|
||
return f"{bps:.0f} B/s"
|
||
if bps < 1048576:
|
||
return f"{bps/1024:.1f} KB/s"
|
||
return f"{bps/1048576:.1f} MB/s"
|
||
|
||
def calc_for_period(secs, key):
|
||
target_ts = now - secs
|
||
# Find closest snapshot to target_ts
|
||
closest = None
|
||
for h in history:
|
||
if h["ts"] <= target_ts:
|
||
if closest is None or h["ts"] > closest["ts"]:
|
||
closest = h
|
||
if closest is None:
|
||
return "—", "—"
|
||
|
||
current_val = current.get(f"{key}_bytes", 0)
|
||
diff = current_val - closest[key]
|
||
if diff < 0:
|
||
diff = 0
|
||
elapsed = now - closest["ts"]
|
||
if elapsed <= 0:
|
||
elapsed = 1
|
||
rate = diff / elapsed
|
||
return format_bytes(diff), format_rate(rate)
|
||
|
||
periods = [
|
||
("1 мин", 60),
|
||
("5 мин", 300),
|
||
("60 мин", 3600),
|
||
("1 день", 86400),
|
||
("7 дней", 604800),
|
||
("30 дней", 2592000),
|
||
("365 дней", 31536000),
|
||
]
|
||
|
||
lines = ["📊 <b>Статистика трафика</b>\n"]
|
||
|
||
for label, key in [("Proxy (telemt)", "proxy"), ("Сайт (nginx)", "site")]:
|
||
lines.append(f"\n<b>{label}:</b>")
|
||
lines.append("<pre>")
|
||
lines.append(f"{'Период':<10} │ {'Трафик':>10} │ {'Скорость':>10}")
|
||
lines.append("─" * 36)
|
||
for name, secs in periods:
|
||
total, rate = calc_for_period(secs, key)
|
||
lines.append(f"{name:<10} │ {total:>10} │ {rate:>10}")
|
||
lines.append("</pre>")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def cb_menu_stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show traffic statistics."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
stats_text = await get_traffic_stats()
|
||
|
||
keyboard = [
|
||
[InlineKeyboardButton("🔄 Обновить", callback_data="menu_stats")],
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
]
|
||
|
||
await safe_edit_message(
|
||
query,
|
||
stats_text,
|
||
reply_markup=InlineKeyboardMarkup(keyboard),
|
||
parse_mode="HTML",
|
||
)
|
||
|
||
|
||
async def cb_menu_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Status callback — show detailed proxy/server status."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
if not await require_auth(update, context):
|
||
return
|
||
|
||
text = await get_status_text(_uid(update))
|
||
keyboard = [
|
||
[InlineKeyboardButton("🔄 Обновить", callback_data="menu_status")],
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
]
|
||
await safe_edit_message(
|
||
query,
|
||
text,
|
||
reply_markup=InlineKeyboardMarkup(keyboard),
|
||
parse_mode="HTML",
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# INSTALL
|
||
# ============================================================================
|
||
|
||
|
||
def get_install_mode_menu() -> InlineKeyboardMarkup:
|
||
"""Install mode selection menu."""
|
||
buttons = [
|
||
[InlineKeyboardButton("⚡ Lite", callback_data="install_mode_lite")],
|
||
[InlineKeyboardButton("🛡 Pro", callback_data="install_mode_pro")],
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
]
|
||
return InlineKeyboardMarkup(buttons)
|
||
|
||
|
||
async def cb_menu_install(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Install menu callback."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# Check for old container
|
||
old_container = await check_old_container()
|
||
if old_container:
|
||
text = (
|
||
f"⚠️ <b>Migration from v1 detected</b>\n\n"
|
||
f"Found Docker container: {html.escape(old_container)}\n\n"
|
||
f"Would you like to:\n"
|
||
f"1. Migrate from v1 (recommended)\n"
|
||
f"2. Fresh install (will remove old container)\n\n"
|
||
f"<i>Select below or choose install mode</i>"
|
||
)
|
||
buttons = [
|
||
[InlineKeyboardButton("🔄 Migrate from v1", callback_data="install_migrate")],
|
||
[InlineKeyboardButton("✨ Fresh Install", callback_data="install_mode_lite")],
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
else:
|
||
text = "Select installation mode:"
|
||
keyboard = get_install_mode_menu()
|
||
|
||
await safe_edit_message(query,
|
||
text, reply_markup=keyboard, parse_mode="HTML"
|
||
)
|
||
|
||
|
||
async def cb_install_mode_lite(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Lite mode domain selection."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# Show domains with pagination (4 per row, 2 rows)
|
||
buttons = []
|
||
for i in range(0, len(LITE_DOMAINS), 2):
|
||
row = []
|
||
for j in range(2):
|
||
if i + j < len(LITE_DOMAINS):
|
||
domain = LITE_DOMAINS[i + j]
|
||
row.append(
|
||
InlineKeyboardButton(
|
||
domain, callback_data=f"lite_dom_{i+j}"
|
||
)
|
||
)
|
||
buttons.append(row)
|
||
|
||
buttons.append([InlineKeyboardButton("« Back", callback_data="menu_install")])
|
||
|
||
text = "Select a domain for Lite mode:"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard)
|
||
|
||
|
||
async def cb_lite_domain(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Lite domain selection callback — real implementation (v2.4.2+).
|
||
|
||
Branches on current mode:
|
||
* lite mode (active): invoke `install.sh --action=change-lite-domain`
|
||
which regenerates the telemt TOML with a new fake-TLS mask domain and
|
||
restarts the service. Preserves secret/port.
|
||
* any other mode: route to CLI. Fresh Lite install is interactive.
|
||
"""
|
||
query = update.callback_query
|
||
data = query.data
|
||
try:
|
||
domain_idx = int(data.split("_")[-1])
|
||
domain = LITE_DOMAINS[domain_idx]
|
||
except (ValueError, IndexError):
|
||
await query.answer("Invalid domain selection")
|
||
return
|
||
|
||
# Defense-in-depth: LITE_DOMAINS is trusted, but validate the shape anyway
|
||
# in case someone extends the list with garbage later.
|
||
if not _DOMAIN_RE.match(domain):
|
||
logger.warning(f"cb_lite_domain: rejecting malformed domain {domain!r}")
|
||
await query.answer("Invalid domain")
|
||
return
|
||
|
||
await query.answer()
|
||
|
||
config = load_json(GOTELEGRAM_CONFIG) or {}
|
||
current_mode = config.get("mode", "")
|
||
|
||
if current_mode != "lite":
|
||
text = (
|
||
"<b>⚠️ Установка Lite из бота пока не поддерживается</b>\n\n"
|
||
f"Выбранный домен: <code>{html.escape(domain)}</code>\n\n"
|
||
"Чтобы установить Lite, запустите на сервере:\n"
|
||
"<code>gotelegram</code> → <b>1) Прокси → 1) Установить/Обновить → Lite</b>\n\n"
|
||
"Существующая конфигурация <b>не была изменена</b>."
|
||
)
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML")
|
||
return
|
||
|
||
# Lite active — switch fake-TLS mask domain in place
|
||
if _BOT_ACTION_LOCK.locked():
|
||
await safe_edit_message(
|
||
query,
|
||
"<b>⏳ Другая операция уже выполняется</b>\n\n"
|
||
"Дождитесь завершения предыдущей смены шаблона/домена и повторите.",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
),
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
progress_text = (
|
||
"<b>⏳ Меняю маскировочный домен...</b>\n\n"
|
||
f"Новый домен: <code>{html.escape(domain)}</code>\n\n"
|
||
"Перегенерирую конфиг telemt и перезапускаю сервис."
|
||
)
|
||
await safe_edit_message(query, progress_text, parse_mode="HTML")
|
||
|
||
async with _BOT_ACTION_LOCK:
|
||
result = await run_bot_action("change-lite-domain", timeout=30, domain=domain)
|
||
|
||
if result.get("status") == "success":
|
||
text = (
|
||
"<b>✅ Маскировочный домен обновлён</b>\n\n"
|
||
f"Новый домен: <code>{html.escape(domain)}</code>\n\n"
|
||
"telemt перезапущен. <b>Важно:</b> старые ссылки подключения больше "
|
||
"не будут работать — нужно заново раздать новые."
|
||
)
|
||
else:
|
||
err_msg = result.get("message", "unknown error")
|
||
err_code = result.get("code", "")
|
||
text = (
|
||
"<b>❌ Не удалось сменить домен</b>\n\n"
|
||
f"Домен: <code>{html.escape(domain)}</code>\n"
|
||
f"Причина: <code>{html.escape(err_msg)}</code>"
|
||
+ (f" (<code>{html.escape(err_code)}</code>)" if err_code else "")
|
||
+ "\n\n"
|
||
"Существующая конфигурация <b>не была изменена</b>."
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_install_mode_pro(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Pro mode - show template categories."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
catalog = load_json(TEMPLATES_CATALOG)
|
||
if not catalog or "categories" not in catalog:
|
||
await safe_edit_message(query,
|
||
"❌ Template catalog not found",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_install")]]
|
||
),
|
||
)
|
||
return
|
||
|
||
user_id = _uid(update)
|
||
buttons = []
|
||
# First item: custom git template (matches CLI behaviour)
|
||
buttons.append([InlineKeyboardButton(
|
||
_t(user_id, "cg_title"), callback_data="pro_custom_git"
|
||
)])
|
||
for cat in catalog.get("categories", []):
|
||
buttons.append(
|
||
[
|
||
InlineKeyboardButton(
|
||
f"📁 {cat['name']}", callback_data=f"pro_cat_{cat['id']}"
|
||
)
|
||
]
|
||
)
|
||
buttons.append([InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_install")])
|
||
|
||
text = "Pro Mode — Select Template Category:"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query, text, reply_markup=keyboard)
|
||
|
||
|
||
# ── Custom git template input flow ──────────────────────────────────────────
|
||
_CUSTOM_GIT_WAITERS: Dict[int, bool] = {}
|
||
_CUSTOM_GIT_URL_RE = re.compile(r'^https://[A-Za-z0-9._~:/\-?#\[\]@!$&\'()*+,;=%]+(@[A-Za-z0-9._\-/]+)?$')
|
||
_CUSTOM_GIT_MAX_MB = 100
|
||
_CUSTOM_GIT_CLONE_TIMEOUT = 90
|
||
|
||
|
||
def _validate_custom_git_url(url: str) -> bool:
|
||
if not url or len(url) > 512:
|
||
return False
|
||
# Block shell metacharacters explicitly
|
||
for bad in (" ", "`", "$", "(", ")", "<", ">", "|", "\\", "\t", "\n", "\r", ";", "&", "'", '"'):
|
||
if bad in url:
|
||
return False
|
||
if not url.lower().startswith("https://"):
|
||
return False
|
||
# Reject embedded userinfo (https://user:pass@host/...) to prevent credential leakage.
|
||
# We look at the netloc — anything between https:// and the first '/'.
|
||
rest = url[len("https://"):]
|
||
netloc_end = rest.find("/")
|
||
netloc = rest if netloc_end == -1 else rest[:netloc_end]
|
||
if "@" in netloc:
|
||
return False
|
||
# Hostname sanity: no empty host, no whitespace already blocked above
|
||
if not netloc or netloc.startswith(":") or netloc.endswith(":"):
|
||
return False
|
||
return True
|
||
|
||
|
||
async def _download_custom_git_template(url_with_branch: str) -> Tuple[bool, str, str]:
|
||
"""Clone a custom git repo and stage its static site under WEBSITE_ROOT.
|
||
|
||
Returns (ok, tpl_id, message_key_or_path).
|
||
"""
|
||
# Parse @branch suffix (only when the branch appears on the last path segment)
|
||
branch = None
|
||
url = url_with_branch
|
||
if "@" in url_with_branch.rsplit("/", 1)[-1]:
|
||
base, _, maybe_branch = url_with_branch.rpartition("@")
|
||
if (
|
||
base.lower().startswith("https://")
|
||
and maybe_branch # reject empty branch after `@`
|
||
and "/" not in maybe_branch
|
||
and re.match(r'^[A-Za-z0-9._/\-]+$', maybe_branch)
|
||
):
|
||
url = base
|
||
branch = maybe_branch
|
||
elif not maybe_branch and base.lower().startswith("https://"):
|
||
# Trailing `@` with no branch — drop it so git doesn't treat it as userinfo
|
||
url = base
|
||
|
||
tpl_id = "custom_" + hashlib.md5(url_with_branch.encode("utf-8")).hexdigest()[:10]
|
||
target_dir = f"/opt/gotelegram/custom_templates/{tpl_id}"
|
||
|
||
# Clean previous copy
|
||
if os.path.isdir(target_dir):
|
||
shutil.rmtree(target_dir, ignore_errors=True)
|
||
|
||
tmp_dir = f"/tmp/{tpl_id}_clone"
|
||
if os.path.isdir(tmp_dir):
|
||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||
|
||
cmd = ["git", "clone", "--depth", "1"]
|
||
if branch:
|
||
cmd += ["--branch", branch]
|
||
cmd += [url, tmp_dir]
|
||
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
*cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
try:
|
||
_, err = await asyncio.wait_for(proc.communicate(), timeout=_CUSTOM_GIT_CLONE_TIMEOUT)
|
||
except asyncio.TimeoutError:
|
||
try:
|
||
proc.kill()
|
||
except ProcessLookupError:
|
||
pass
|
||
return False, tpl_id, "cg_timeout"
|
||
if proc.returncode != 0:
|
||
return False, tpl_id, "cg_invalid"
|
||
except Exception as e:
|
||
logger.warning("custom git clone failed: %s", e)
|
||
return False, tpl_id, "cg_invalid"
|
||
|
||
# Remove .git to enforce size guard and avoid leaking repo history
|
||
git_dir = os.path.join(tmp_dir, ".git")
|
||
if os.path.isdir(git_dir):
|
||
shutil.rmtree(git_dir, ignore_errors=True)
|
||
|
||
# Size guard
|
||
total = 0
|
||
for root, _dirs, files in os.walk(tmp_dir):
|
||
for f in files:
|
||
try:
|
||
total += os.path.getsize(os.path.join(root, f))
|
||
except OSError:
|
||
pass
|
||
if total > _CUSTOM_GIT_MAX_MB * 1024 * 1024:
|
||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||
return False, tpl_id, "cg_too_big"
|
||
|
||
# Locate index.html in priority order
|
||
found_root = None
|
||
for sub in ("", "dist", "public", "build", "_site", "site", "docs", "out", "www"):
|
||
cand = os.path.join(tmp_dir, sub) if sub else tmp_dir
|
||
if os.path.isfile(os.path.join(cand, "index.html")):
|
||
found_root = cand
|
||
break
|
||
if not found_root:
|
||
# Fallback: search maxdepth 4
|
||
for root, _dirs, files in os.walk(tmp_dir):
|
||
depth = root[len(tmp_dir):].count(os.sep)
|
||
if depth > 4:
|
||
continue
|
||
if "index.html" in files:
|
||
found_root = root
|
||
break
|
||
if not found_root:
|
||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||
return False, tpl_id, "cg_no_index"
|
||
|
||
# Stage final template dir
|
||
os.makedirs(os.path.dirname(target_dir), exist_ok=True)
|
||
shutil.copytree(found_root, target_dir)
|
||
try:
|
||
with open(os.path.join(target_dir, ".custom_git_source"), "w", encoding="utf-8") as f:
|
||
f.write(url_with_branch + "\n")
|
||
except OSError:
|
||
pass
|
||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||
return True, tpl_id, target_dir
|
||
|
||
|
||
async def cb_pro_category(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show templates in category."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
cat_id = data.removeprefix("pro_cat_")
|
||
|
||
await query.answer()
|
||
|
||
catalog = load_json(TEMPLATES_CATALOG)
|
||
if not catalog:
|
||
await safe_edit_message(query,"❌ Template catalog not found")
|
||
return
|
||
|
||
# Find category and templates
|
||
category = None
|
||
templates = []
|
||
for cat in catalog.get("categories", []):
|
||
if cat["id"] == cat_id:
|
||
category = cat
|
||
templates = cat.get("templates", [])
|
||
break
|
||
|
||
if not category:
|
||
await safe_edit_message(query,"❌ Category not found")
|
||
return
|
||
|
||
buttons = []
|
||
for tpl in templates:
|
||
buttons.append(
|
||
[
|
||
InlineKeyboardButton(
|
||
f"🎨 {tpl['name']}", callback_data=f"pro_tpl_{tpl['id']}"
|
||
)
|
||
]
|
||
)
|
||
buttons.append([InlineKeyboardButton("« Back", callback_data="install_mode_pro")])
|
||
|
||
text = f"Select template from <b>{html.escape(category['name'])}</b>:"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_pro_template(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show template preview and confirm."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
tpl_id = data.removeprefix("pro_tpl_")
|
||
|
||
await query.answer()
|
||
|
||
catalog = load_json(TEMPLATES_CATALOG)
|
||
if not catalog:
|
||
await safe_edit_message(query,"❌ Template catalog not found")
|
||
return
|
||
|
||
# Find template
|
||
template = None
|
||
for cat in catalog.get("categories", []):
|
||
for tpl in cat.get("templates", []):
|
||
if tpl["id"] == tpl_id:
|
||
template = tpl
|
||
break
|
||
if template:
|
||
break
|
||
|
||
if not template:
|
||
await safe_edit_message(query,"❌ Template not found")
|
||
return
|
||
|
||
tpl_name = html.escape(template.get('name', 'Unknown'))
|
||
tpl_desc = html.escape(template.get('description', 'N/A'))
|
||
text = (
|
||
f"<b>🎨 Template Preview</b>\n\n"
|
||
f"<b>Name:</b> {tpl_name}\n"
|
||
f"<b>Description:</b> {tpl_desc}\n\n"
|
||
)
|
||
if "preview_url" in template:
|
||
preview_url = html.escape(template['preview_url'], quote=True)
|
||
text += f'<a href="{preview_url}">View Live Preview</a>\n\n'
|
||
|
||
text += "Confirm installation?"
|
||
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"✅ Install", callback_data=f"pro_confirm_{tpl_id}"
|
||
)
|
||
],
|
||
[InlineKeyboardButton("« Back", callback_data="install_mode_pro")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_pro_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Confirm Pro template selection — real implementation (v2.4.2+).
|
||
|
||
Branches on current mode:
|
||
* pro mode (active deployment): invoke `install.sh --action=change-template`
|
||
which downloads the new template and redeploys it to nginx. Reuses the
|
||
existing domain + SSL cert.
|
||
* any other mode (or no install at all): route to CLI. Fresh Pro install
|
||
still requires interactive flow (domain, email, DNS check) — not safe
|
||
to run headless from the bot.
|
||
|
||
Historic context: v2.4.1 stub used to overwrite config.json with a fake
|
||
blob; that was replaced with a safe message in v2.4.1 hotfix; now in
|
||
v2.4.2 we wire the real change-template path through install.sh.
|
||
"""
|
||
query = update.callback_query
|
||
data = query.data
|
||
tpl_id = data.removeprefix("pro_confirm_")
|
||
|
||
await query.answer()
|
||
|
||
# Defense-in-depth: even though subprocess.exec uses list args (no shell),
|
||
# we still enforce the catalog id shape before handing it to install.sh.
|
||
if not _TPL_ID_RE.match(tpl_id):
|
||
logger.warning(f"cb_pro_confirm: rejecting malformed tpl_id {tpl_id!r}")
|
||
await safe_edit_message(
|
||
query,
|
||
"<b>❌ Некорректный идентификатор шаблона</b>\n\n"
|
||
"Выбран неподдерживаемый шаблон. Вернитесь в меню и попробуйте снова.",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
),
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
# Read current config to decide: in-place change-template vs fresh install
|
||
config = load_json(GOTELEGRAM_CONFIG) or {}
|
||
current_mode = config.get("mode", "")
|
||
|
||
if current_mode != "pro":
|
||
# Fresh install / mode switch — still routes to CLI (needs domain, SSL)
|
||
text = (
|
||
"<b>⚠️ Установка Pro из бота пока не поддерживается</b>\n\n"
|
||
f"Выбранный шаблон: <code>{html.escape(tpl_id)}</code>\n\n"
|
||
"Pro-режим требует ввода домена, email и проверки DNS. "
|
||
"Чтобы установить Pro, запустите на сервере:\n"
|
||
"<code>gotelegram</code> → <b>1) Прокси → 1) Установить/Обновить → Pro</b>\n\n"
|
||
"Существующая конфигурация <b>не была изменена</b>."
|
||
)
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML")
|
||
return
|
||
|
||
# Pro mode is active — perform change-template in place
|
||
if _BOT_ACTION_LOCK.locked():
|
||
await safe_edit_message(
|
||
query,
|
||
"<b>⏳ Другая операция уже выполняется</b>\n\n"
|
||
"Дождитесь завершения предыдущей смены шаблона/домена и повторите.",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
),
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
progress_text = (
|
||
"<b>⏳ Меняю шаблон сайта...</b>\n\n"
|
||
f"Шаблон: <code>{html.escape(tpl_id)}</code>\n\n"
|
||
"Скачиваю репозиторий и разворачиваю в nginx. "
|
||
"Это может занять 30–90 секунд."
|
||
)
|
||
await safe_edit_message(query, progress_text, parse_mode="HTML")
|
||
|
||
# Template download + git clone can be slow — generous timeout.
|
||
# Mutex serializes with any concurrent change-lite-domain/change-template.
|
||
async with _BOT_ACTION_LOCK:
|
||
result = await run_bot_action("change-template", timeout=180, template=tpl_id)
|
||
|
||
if result.get("status") == "success":
|
||
domain = result.get("domain", config.get("domain", ""))
|
||
text = (
|
||
"<b>✅ Шаблон обновлён</b>\n\n"
|
||
f"Новый шаблон: <code>{html.escape(tpl_id)}</code>\n"
|
||
f"Сайт: <a href=\"https://{html.escape(domain, quote=True)}\">https://{html.escape(domain)}</a>\n\n"
|
||
"Прокси продолжает работать без перерыва."
|
||
)
|
||
else:
|
||
err_msg = result.get("message", "unknown error")
|
||
err_code = result.get("code", "")
|
||
text = (
|
||
"<b>❌ Не удалось сменить шаблон</b>\n\n"
|
||
f"Шаблон: <code>{html.escape(tpl_id)}</code>\n"
|
||
f"Причина: <code>{html.escape(err_msg)}</code>"
|
||
+ (f" (<code>{html.escape(err_code)}</code>)" if err_code else "")
|
||
+ "\n\n"
|
||
"Существующая конфигурация <b>не была изменена</b>. "
|
||
"Попробуйте другой шаблон или запустите <code>gotelegram</code> из консоли."
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML", disable_web_page_preview=True)
|
||
|
||
|
||
# ============================================================================
|
||
# PROXY LINK & SHARE
|
||
# ============================================================================
|
||
|
||
|
||
async def get_proxy_link() -> Optional[str]:
|
||
"""Generate proxy link from config. Pro-mode uses domain + fake-TLS secret."""
|
||
config = load_json(GOTELEGRAM_CONFIG)
|
||
if not config:
|
||
return None
|
||
|
||
# Get secret from telemt TOML config (v3 format: [access.users] main = "...")
|
||
secret = config.get("secret", "")
|
||
if not secret:
|
||
telemt_cfg = load_toml(TELEMT_CONFIG)
|
||
if telemt_cfg:
|
||
access = telemt_cfg.get("access", {})
|
||
users = access.get("users", {})
|
||
if isinstance(users, dict):
|
||
secret = users.get("main", "")
|
||
if not secret:
|
||
return None
|
||
|
||
mode = config.get("mode", "lite")
|
||
domain = config.get("domain", "")
|
||
port = config.get("port", 443)
|
||
|
||
# Pro-режим: ссылка с доменом и fake-TLS секретом (ee + secret + hex domain)
|
||
if mode == "pro" and domain:
|
||
domain_hex = domain.encode().hex()
|
||
faketls_secret = f"ee{secret}{domain_hex}"
|
||
return f"tg://proxy?server={domain}&port={port}&secret={faketls_secret}"
|
||
|
||
# Lite-режим: IP + fake-TLS с mask_host
|
||
code, stdout, _ = await sh("curl", "-s", "-4", "--max-time", "5", "https://api.ipify.org")
|
||
server = stdout.strip() if code == 0 and stdout.strip() else "0.0.0.0"
|
||
|
||
mask_host = config.get("mask_host", "")
|
||
if mask_host:
|
||
domain_hex = mask_host.encode().hex()
|
||
faketls_secret = f"ee{secret}{domain_hex}"
|
||
return f"tg://proxy?server={server}&port={port}&secret={faketls_secret}"
|
||
|
||
return f"tg://proxy?server={server}&port={port}&secret={secret}"
|
||
|
||
|
||
async def cb_menu_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Generate and show proxy link."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
link = await get_proxy_link()
|
||
if not link:
|
||
text = "❌ Proxy not installed yet. Run install first."
|
||
else:
|
||
text = (
|
||
f"<b>🔗 Proxy Link</b>\n\n"
|
||
f"<code>{html.escape(link)}</code>\n\n"
|
||
f"Open in Telegram to connect."
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_menu_share(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Share link as QR code."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
link = await get_proxy_link()
|
||
if not link:
|
||
text = "❌ Proxy not installed yet."
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard)
|
||
return
|
||
|
||
# Try to generate QR code
|
||
qr_file = None
|
||
code, _, _ = await sh("which", "qrencode")
|
||
if code == 0:
|
||
qr_path = "/tmp/proxy_qr.png"
|
||
code, _, _ = await sh("qrencode", "-o", qr_path, link)
|
||
if code == 0 and os.path.exists(qr_path):
|
||
qr_file = qr_path
|
||
|
||
if qr_file:
|
||
try:
|
||
with open(qr_file, "rb") as f:
|
||
await query.message.reply_photo(
|
||
photo=f,
|
||
caption=f"<b>📤 Proxy QR Code</b>\n\n{html.escape(link)}",
|
||
parse_mode="HTML",
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to send QR code: {e}")
|
||
await safe_edit_message(query,
|
||
f"<b>🔗 Proxy Link</b>\n\n<code>{html.escape(link)}</code>",
|
||
parse_mode="HTML",
|
||
)
|
||
finally:
|
||
try:
|
||
os.remove(qr_file)
|
||
except OSError:
|
||
pass
|
||
else:
|
||
await safe_edit_message(query,
|
||
f"<b>🔗 Proxy Link</b>\n\n<code>{html.escape(link)}</code>",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
),
|
||
parse_mode="HTML",
|
||
)
|
||
|
||
|
||
# ============================================================================
|
||
# RESTART & LOGS
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_restart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Restart service."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
text = "⏳ Restarting telemt service..."
|
||
await safe_edit_message(query,text)
|
||
|
||
code, _, stderr = await sh("systemctl", "restart", TELEMT_SERVICE)
|
||
if code == 0:
|
||
text = "✅ Service restarted successfully"
|
||
else:
|
||
text = f"❌ Failed to restart:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,
|
||
text, reply_markup=keyboard, parse_mode="HTML"
|
||
)
|
||
|
||
|
||
async def cb_menu_logs(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show recent logs."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
code, stdout, _ = await sh(
|
||
"journalctl", "-u", TELEMT_SERVICE, "-n", "30", "--no-pager"
|
||
)
|
||
|
||
if code == 0:
|
||
log_text = stdout[-1000:] if len(stdout) > 1000 else stdout
|
||
text = f"<b>📋 Recent Logs</b>\n\n<pre>{html.escape(log_text)}</pre>"
|
||
else:
|
||
text = "❌ Failed to retrieve logs"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# BACKUP & RESTORE
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Backup menu."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
# List existing backups
|
||
backups = []
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backups = sorted(
|
||
[f for f in os.listdir(BACKUP_DIR)
|
||
if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
|
||
reverse=True,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
buttons = [[InlineKeyboardButton("💾 Create Backup", callback_data="backup_create")]]
|
||
|
||
if backups:
|
||
buttons.append(
|
||
[InlineKeyboardButton("📋 List Backups", callback_data="backup_list")]
|
||
)
|
||
|
||
buttons.append([InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")])
|
||
|
||
text = f"<b>💾 Backup Management</b>\n\nExisting backups: {len(backups)}"
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_backup_create(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Create backup."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Creating backup...")
|
||
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
backup_file = os.path.join(BACKUP_DIR, f"backup_{timestamp}.tar.gz")
|
||
|
||
os.makedirs(BACKUP_DIR, exist_ok=True)
|
||
code, _, stderr = await sh(
|
||
"tar", "-czf", backup_file, GOTELEGRAM_CONFIG, TELEMT_CONFIG
|
||
)
|
||
|
||
if code == 0:
|
||
text = f"✅ Backup created:\n<code>{html.escape(backup_file)}</code>"
|
||
else:
|
||
text = f"❌ Backup failed:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_backup")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_backup_list(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""List backups."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
backups = []
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backups = sorted(
|
||
[f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
|
||
reverse=True,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
if not backups:
|
||
text = "No backups found"
|
||
else:
|
||
text = "<b>📋 Available Backups</b>\n\n"
|
||
for backup in backups[:10]:
|
||
path = os.path.join(BACKUP_DIR, backup)
|
||
size = os.path.getsize(path) / (1024 * 1024)
|
||
text += f"<code>{html.escape(backup)}</code> ({size:.2f} MB)\n"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_backup")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_menu_restore(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Restore menu."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
backups = []
|
||
try:
|
||
if os.path.exists(BACKUP_DIR):
|
||
backups = sorted(
|
||
[f for f in os.listdir(BACKUP_DIR) if f.endswith((".tar.gz", ".tar.gz.enc")) and not f.endswith(".sha256")],
|
||
reverse=True,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
if not backups:
|
||
text = "❌ No backups available"
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
else:
|
||
text = "Select backup to restore:"
|
||
buttons = []
|
||
for i, backup in enumerate(backups[:10]):
|
||
buttons.append(
|
||
[
|
||
InlineKeyboardButton(
|
||
backup, callback_data=f"restore_idx_{i}"
|
||
)
|
||
]
|
||
)
|
||
buttons.append([InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")])
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
# Store backup list in user_data for retrieval
|
||
context.user_data["backup_list"] = backups[:10]
|
||
|
||
await safe_edit_message(query,text, reply_markup=keyboard)
|
||
|
||
|
||
async def cb_restore_backup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Execute backup restoration."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
|
||
try:
|
||
idx = int(data.removeprefix("restore_idx_"))
|
||
except ValueError:
|
||
await query.answer("Invalid backup selection")
|
||
return
|
||
|
||
backup_list = context.user_data.get("backup_list", [])
|
||
if idx < 0 or idx >= len(backup_list):
|
||
await query.answer("Backup not found")
|
||
return
|
||
|
||
backup_name = backup_list[idx]
|
||
backup_path = os.path.join(BACKUP_DIR, backup_name)
|
||
|
||
await query.answer()
|
||
await safe_edit_message(query,f"⏳ Restoring from {html.escape(backup_name)}...")
|
||
|
||
if not os.path.exists(backup_path):
|
||
text = "❌ Backup file not found"
|
||
else:
|
||
# Simple restore: extract tar to overwrite configs
|
||
code, _, stderr = await sh(
|
||
"tar", "-xzf", backup_path, "-C", "/", timeout=60
|
||
)
|
||
if code == 0:
|
||
# Restart services
|
||
await sh("systemctl", "restart", TELEMT_SERVICE)
|
||
text = f"✅ Restored from {html.escape(backup_name)}"
|
||
else:
|
||
text = f"❌ Restore failed:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# UPDATE & MODE/TEMPLATE CHANGE
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_update(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Update telemt by re-running the install script's update logic."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Checking for telemt updates...")
|
||
|
||
# Get current version
|
||
cur_code, cur_out, _ = await sh("telemt", "--version")
|
||
current = cur_out.strip() if cur_code == 0 else "unknown"
|
||
|
||
# Check latest release from GitHub
|
||
code, stdout, stderr = await sh(
|
||
"curl", "-s", "--max-time", "10",
|
||
"https://api.github.com/repos/telemt/telemt/releases/latest",
|
||
)
|
||
|
||
if code != 0 or not stdout.strip():
|
||
text = "❌ Failed to check for updates"
|
||
else:
|
||
try:
|
||
release = json.loads(stdout)
|
||
latest = release.get("tag_name", "unknown")
|
||
if latest == current:
|
||
text = f"✅ telemt is already up to date ({html.escape(current)})"
|
||
else:
|
||
text = (
|
||
f"ℹ️ Update available: {html.escape(current)} → {html.escape(latest)}\n\n"
|
||
f"Run the CLI installer to update:\n"
|
||
f"<code>sudo bash install.sh</code> → menu item 10"
|
||
)
|
||
except json.JSONDecodeError:
|
||
text = "❌ Failed to parse release info"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_menu_change(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Change mode or template."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
buttons = [
|
||
[InlineKeyboardButton("⚡ Switch to Lite Mode", callback_data="change_lite")],
|
||
[InlineKeyboardButton("🛡 Switch to Pro Mode", callback_data="change_pro")],
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,
|
||
"Change mode or template:", reply_markup=keyboard
|
||
)
|
||
|
||
|
||
async def cb_change_lite(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Switch to lite mode — show domain selection."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
# Reuse the lite mode domain selection flow
|
||
await cb_install_mode_lite(update, context)
|
||
|
||
|
||
async def cb_change_pro(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Switch to pro mode — show template categories."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
# Reuse the pro mode template selection flow
|
||
await cb_install_mode_pro(update, context)
|
||
|
||
|
||
async def cb_install_migrate(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Migrate from v1 (mtg Docker) to v2 (telemt)."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Migrating from v1...")
|
||
|
||
# Stop old mtg container
|
||
code, _, stderr = await sh("docker", "stop", "mtproto-proxy", timeout=30)
|
||
if code != 0:
|
||
code, _, stderr = await sh("docker", "stop", "mtg", timeout=30)
|
||
|
||
# Remove old container
|
||
await sh("docker", "rm", "mtproto-proxy", timeout=15)
|
||
await sh("docker", "rm", "mtg", timeout=15)
|
||
|
||
text = (
|
||
"✅ <b>v1 container stopped and removed</b>\n\n"
|
||
"Now select installation mode for v2:"
|
||
)
|
||
keyboard = get_install_mode_menu()
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# WEBSITE & SSL
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_website(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Website and SSL management."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
buttons = [
|
||
[InlineKeyboardButton("🔄 Renew SSL Certificate", callback_data="ssl_renew")],
|
||
[InlineKeyboardButton("📊 SSL Status", callback_data="ssl_status")],
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,
|
||
"Website & SSL Management:", reply_markup=keyboard
|
||
)
|
||
|
||
|
||
async def cb_ssl_renew(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Renew SSL certificate."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Renewing SSL certificate...")
|
||
|
||
code, stdout, stderr = await sh("certbot", "renew", timeout=120)
|
||
|
||
if code == 0:
|
||
text = "✅ SSL certificate renewed successfully"
|
||
else:
|
||
text = f"❌ Renewal failed:\n<code>{html.escape(stderr[:500])}</code>"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_website")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_ssl_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Show SSL status."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
code, stdout, _ = await sh("certbot", "certificates")
|
||
|
||
if code == 0:
|
||
text = f"<b>📊 SSL Certificates</b>\n\n<pre>{html.escape(stdout[:1000])}</pre>"
|
||
else:
|
||
text = "❌ Failed to get SSL status"
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton("« Back", callback_data="menu_website")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# ADMIN MANAGEMENT
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_admins(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Показать список админов и кнопки управления."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
if ALLOWED_IDS:
|
||
ids_list = "\n".join(f" • <code>{uid}</code>" for uid in sorted(ALLOWED_IDS))
|
||
text = f"<b>👤 Администраторы</b>\n\n{ids_list}\n"
|
||
else:
|
||
text = "<b>👤 Администраторы</b>\n\n<i>Список пуст — доступ для всех</i>\n"
|
||
|
||
text += (
|
||
f"\nВсего: {len(ALLOWED_IDS)}\n\n"
|
||
"Чтобы <b>добавить</b> — перешлите любое сообщение от нового админа, "
|
||
"или отправьте команду:\n"
|
||
"<code>/addadmin 123456789</code>\n\n"
|
||
"Чтобы <b>удалить</b>:\n"
|
||
"<code>/deladmin 123456789</code>"
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup([
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
])
|
||
await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cmd_addadmin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""/addadmin ID [ID2 ID3 ...] — добавить админа вручную."""
|
||
if not is_user_allowed(update.effective_user.id):
|
||
await update.message.reply_text(
|
||
f"⛔ Доступ запрещён.\nВаш ID: <code>{update.effective_user.id}</code>",
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
args = context.args or []
|
||
if not args:
|
||
await update.message.reply_text(
|
||
"Использование: <code>/addadmin ID [ID2 ID3 ...]</code>\n"
|
||
"Пример: <code>/addadmin 123456789 987654321</code>",
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
added = []
|
||
errors = []
|
||
for a in args:
|
||
a = a.strip().replace(",", "")
|
||
if not a:
|
||
continue
|
||
try:
|
||
uid = int(a)
|
||
add_admin(uid)
|
||
added.append(str(uid))
|
||
except ValueError:
|
||
errors.append(a)
|
||
|
||
parts = []
|
||
if added:
|
||
parts.append(f"✅ Добавлены: {', '.join(added)}")
|
||
if errors:
|
||
parts.append(f"❌ Ошибки: {', '.join(errors)}")
|
||
|
||
await update.message.reply_text("\n".join(parts), parse_mode="HTML")
|
||
|
||
|
||
async def cmd_deladmin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""/deladmin ID — удалить админа."""
|
||
if not is_user_allowed(update.effective_user.id):
|
||
await update.message.reply_text(
|
||
f"⛔ Доступ запрещён.\nВаш ID: <code>{update.effective_user.id}</code>",
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
args = context.args or []
|
||
if not args:
|
||
await update.message.reply_text(
|
||
"Использование: <code>/deladmin ID</code>",
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
removed = []
|
||
for a in args:
|
||
a = a.strip().replace(",", "")
|
||
try:
|
||
uid = int(a)
|
||
if uid == update.effective_user.id:
|
||
await update.message.reply_text("⚠️ Нельзя удалить себя!")
|
||
continue
|
||
if uid in ALLOWED_IDS:
|
||
remove_admin(uid)
|
||
removed.append(str(uid))
|
||
else:
|
||
await update.message.reply_text(f"ID {uid} не найден в списке")
|
||
except ValueError:
|
||
await update.message.reply_text(f"❌ Некорректный ID: {html.escape(a)}")
|
||
|
||
if removed:
|
||
await update.message.reply_text(f"✅ Удалены: {', '.join(removed)}")
|
||
|
||
|
||
# ============================================================================
|
||
# PROMO & CREDITS
|
||
# ============================================================================
|
||
|
||
|
||
def get_promo_text() -> str:
|
||
"""Return promo text with 2 hosters + donate."""
|
||
return (
|
||
"<b>💰 Хостинг #1 — скидка до 60%</b>\n"
|
||
f"<a href='{PROMO_LINK_1}'>{PROMO_LINK_1}</a>\n\n"
|
||
"<b>Промокоды:</b>\n"
|
||
" <code>OFF60</code> — 60% на первый месяц\n"
|
||
" <code>antenka20</code> — 20% + 3% за 3 мес\n"
|
||
" <code>antenka6</code> — 15% + 5% за 6 мес\n\n"
|
||
"━━━━━━━━━━━━━━━━━━━━━━━━━\n\n"
|
||
"<b>💰 Хостинг #2 — скидка до 60%</b>\n"
|
||
f"<a href='{PROMO_LINK_2}'>{PROMO_LINK_2}</a>\n\n"
|
||
"<b>Промокод:</b>\n"
|
||
" <code>OFF60</code> — 60% на первый месяц\n\n"
|
||
"━━━━━━━━━━━━━━━━━━━━━━━━━\n\n"
|
||
"<b>☕ Донат / Чаевые</b>\n"
|
||
f"<a href='{TIP_LINK}'>{TIP_LINK}</a>"
|
||
)
|
||
|
||
|
||
def should_show_promo_bot() -> bool:
|
||
"""Check if promo should be shown (once per 24h)."""
|
||
try:
|
||
if not os.path.exists(PROMO_STAMP_FILE):
|
||
return True
|
||
with open(PROMO_STAMP_FILE, "r") as f:
|
||
last_ts = int(f.read().strip())
|
||
return (int(time.time()) - last_ts) >= 86400
|
||
except (ValueError, OSError):
|
||
return True
|
||
|
||
|
||
def mark_promo_shown_bot() -> None:
|
||
"""Mark promo as shown."""
|
||
try:
|
||
os.makedirs(os.path.dirname(PROMO_STAMP_FILE), exist_ok=True)
|
||
with open(PROMO_STAMP_FILE, "w") as f:
|
||
f.write(str(int(time.time())))
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
async def cb_menu_promo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Promo information — shown as a separate ephemeral message that
|
||
auto-deletes after 30s so it does not clutter the chat. The main menu
|
||
message stays intact (we don't edit it in place)."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
promo_msg = await query.message.reply_text(
|
||
get_promo_text(), parse_mode="HTML", disable_web_page_preview=True
|
||
)
|
||
asyncio.create_task(_delete_message_after(promo_msg, 30))
|
||
|
||
|
||
async def cb_menu_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Credits and acknowledgements."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
text = (
|
||
f"<b>ℹ️ Credits & Acknowledgements</b>\n\n"
|
||
f"<b>GoTelegram v{GOTELEGRAM_VERSION}</b>\n\n"
|
||
f"Built with love for the Telegram community\n\n"
|
||
f"<b>Special thanks to:</b>\n\n"
|
||
f"🙏 <b>telemt</b> - MTProxy engine\n"
|
||
f" High-performance proxy core\n\n"
|
||
f"🎨 <b>HTML5UP</b> - Beautiful web templates\n"
|
||
f" Responsive design & themes\n\n"
|
||
f"📚 <b>Learning Zone</b> - Educational resources\n"
|
||
f" Community learning support\n\n"
|
||
f"🚀 <b>Start Bootstrap</b> - Bootstrap templates\n"
|
||
f" Professional design framework\n\n"
|
||
f"💬 <b>Community</b> - Your feedback & support\n\n"
|
||
f"<i>GoTelegram is open-source and community-driven</i>"
|
||
)
|
||
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# REMOVE
|
||
# ============================================================================
|
||
|
||
|
||
async def cb_menu_remove(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Remove installation."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
text = (
|
||
"<b>⚠️ Remove GoTelegram</b>\n\n"
|
||
"This will completely remove the installation.\n"
|
||
"Are you sure?"
|
||
)
|
||
|
||
buttons = [
|
||
[InlineKeyboardButton("❌ Yes, Remove", callback_data="remove_confirm")],
|
||
[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")],
|
||
]
|
||
keyboard = InlineKeyboardMarkup(buttons)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
async def cb_remove_confirm(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Confirm removal."""
|
||
query = update.callback_query
|
||
await query.answer()
|
||
|
||
await safe_edit_message(query,"⏳ Removing GoTelegram...")
|
||
|
||
# Stop service
|
||
await sh("systemctl", "stop", TELEMT_SERVICE)
|
||
|
||
# Remove directories
|
||
for path in ["/opt/gotelegram", WEBSITE_ROOT]:
|
||
await sh("rm", "-rf", path)
|
||
|
||
text = "✅ GoTelegram removed successfully"
|
||
keyboard = InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(_uid(update), "btn_back"), callback_data="menu_main")]]
|
||
)
|
||
await safe_edit_message(query,text, reply_markup=keyboard, parse_mode="HTML")
|
||
|
||
|
||
# ============================================================================
|
||
# CALLBACK ROUTING
|
||
# ============================================================================
|
||
|
||
|
||
async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Route all callbacks."""
|
||
query = update.callback_query
|
||
data = query.data
|
||
|
||
# ── Авто-регистрация админа (до проверки доступа!) ──
|
||
if data.startswith("admin_confirm_"):
|
||
await query.answer()
|
||
try:
|
||
new_admin_id = int(data.split("_")[-1])
|
||
except (ValueError, IndexError):
|
||
await safe_edit_message(query, "❌ Ошибка: некорректный ID")
|
||
return
|
||
# Безопасность: только тот кто нажал кнопку может стать админом
|
||
if update.effective_user.id != new_admin_id:
|
||
await query.answer("Эта кнопка не для вас", show_alert=True)
|
||
return
|
||
# Race condition: если кто-то уже стал админом
|
||
if not _WAITING_FOR_ADMIN:
|
||
await safe_edit_message(query, "ℹ️ Администратор уже назначен.")
|
||
return
|
||
add_admin(new_admin_id)
|
||
await safe_edit_message(
|
||
query,
|
||
f"✅ <b>Вы назначены администратором!</b>\n\n"
|
||
f"ID: <code>{new_admin_id}</code>\n\n"
|
||
f"Нажмите /start чтобы открыть меню.",
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
if data == "admin_cancel":
|
||
await query.answer()
|
||
await safe_edit_message(
|
||
query,
|
||
"👋 Ок. Напишите /start когда будете готовы.",
|
||
)
|
||
return
|
||
|
||
# Access control
|
||
if not is_user_allowed(update.effective_user.id):
|
||
await query.answer("Доступ запрещён")
|
||
return
|
||
|
||
user_id = update.effective_user.id
|
||
|
||
# Main menu
|
||
if data == "menu_main":
|
||
await query.answer()
|
||
buttons = get_main_menu(user_id)
|
||
text = (
|
||
f"<b>{_tf(user_id, 'welcome_title', GOTELEGRAM_VERSION)}</b>\n\n"
|
||
f"{_t(user_id, 'welcome_subtitle')}\n"
|
||
f"{_t(user_id, 'welcome_prompt')}"
|
||
)
|
||
await safe_edit_message(query, text, reply_markup=buttons, parse_mode="HTML")
|
||
return
|
||
|
||
if data == "close_menu":
|
||
await query.answer()
|
||
await query.delete_message()
|
||
return
|
||
|
||
# Language picker
|
||
if data == "menu_lang":
|
||
await query.answer()
|
||
current = get_user_lang(user_id)
|
||
title = _t(user_id, "lang_title")
|
||
curr_line = _tf(user_id, "lang_current", get_language_name(current))
|
||
prompt = _t(user_id, "lang_choose")
|
||
text = f"<b>{title}</b>\n\n{curr_line}\n\n{prompt}"
|
||
keyboard = InlineKeyboardMarkup([
|
||
[
|
||
InlineKeyboardButton("🇬🇧 English", callback_data="lang_set_en"),
|
||
InlineKeyboardButton("🇷🇺 Русский", callback_data="lang_set_ru"),
|
||
],
|
||
[InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_main")],
|
||
])
|
||
await safe_edit_message(query, text, reply_markup=keyboard, parse_mode="HTML")
|
||
return
|
||
|
||
if data.startswith("lang_set_"):
|
||
code = data.replace("lang_set_", "", 1)
|
||
if code in SUPPORTED_LANGS:
|
||
set_user_lang(user_id, code)
|
||
await query.answer(_tf(user_id, "lang_saved", get_language_name(code)))
|
||
# Re-render main menu in the new language
|
||
buttons = get_main_menu(user_id)
|
||
text = (
|
||
f"<b>{_tf(user_id, 'welcome_title', GOTELEGRAM_VERSION)}</b>\n\n"
|
||
f"{_t(user_id, 'welcome_subtitle')}\n"
|
||
f"{_t(user_id, 'welcome_prompt')}"
|
||
)
|
||
await safe_edit_message(query, text, reply_markup=buttons, parse_mode="HTML")
|
||
else:
|
||
await query.answer("Unsupported language")
|
||
return
|
||
|
||
# Dispatch to handlers
|
||
handlers = {
|
||
"menu_install": cb_menu_install,
|
||
"menu_status": cb_menu_status,
|
||
"menu_link": cb_menu_link,
|
||
"menu_share": cb_menu_share,
|
||
"menu_restart": cb_menu_restart,
|
||
"menu_logs": cb_menu_logs,
|
||
"menu_backup": cb_menu_backup,
|
||
"menu_restore": cb_menu_restore,
|
||
"menu_update": cb_menu_update,
|
||
"menu_change": cb_menu_change,
|
||
"menu_website": cb_menu_website,
|
||
"menu_promo": cb_menu_promo,
|
||
"menu_credits": cb_menu_credits,
|
||
"menu_admins": cb_menu_admins,
|
||
"menu_remove": cb_menu_remove,
|
||
"install_mode_lite": cb_install_mode_lite,
|
||
"install_mode_pro": cb_install_mode_pro,
|
||
"backup_create": cb_backup_create,
|
||
"backup_list": cb_backup_list,
|
||
"ssl_renew": cb_ssl_renew,
|
||
"ssl_status": cb_ssl_status,
|
||
"remove_confirm": cb_remove_confirm,
|
||
"change_lite": cb_change_lite,
|
||
"change_pro": cb_change_pro,
|
||
"install_migrate": cb_install_migrate,
|
||
"menu_stats": cb_menu_stats,
|
||
}
|
||
|
||
# Custom git template URL prompt
|
||
if data == "pro_custom_git":
|
||
await query.answer()
|
||
_CUSTOM_GIT_WAITERS[user_id] = True
|
||
title = _t(user_id, "cg_title")
|
||
body = _t(user_id, "cg_ask_url")
|
||
await safe_edit_message(
|
||
query,
|
||
f"<b>{title}</b>\n\n{body}",
|
||
reply_markup=InlineKeyboardMarkup(
|
||
[[InlineKeyboardButton(_t(user_id, "btn_cancel"), callback_data="menu_main")]]
|
||
),
|
||
parse_mode="HTML",
|
||
)
|
||
return
|
||
|
||
# Pattern-based handlers
|
||
if data.startswith("lite_dom_"):
|
||
await cb_lite_domain(update, context)
|
||
elif data.startswith("pro_cat_"):
|
||
await cb_pro_category(update, context)
|
||
elif data.startswith("pro_tpl_"):
|
||
await cb_pro_template(update, context)
|
||
elif data.startswith("pro_confirm_"):
|
||
await cb_pro_confirm(update, context)
|
||
elif data.startswith("restore_idx_"):
|
||
await cb_restore_backup(update, context)
|
||
elif data in handlers:
|
||
await handlers[data](update, context)
|
||
else:
|
||
await query.answer("Unknown action")
|
||
|
||
|
||
# ============================================================================
|
||
# ERROR HANDLERS
|
||
# ============================================================================
|
||
|
||
|
||
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Handle free-text input. Currently used for custom git template URLs."""
|
||
if update.message is None or update.message.text is None:
|
||
return
|
||
if not is_user_allowed(update.effective_user.id):
|
||
return
|
||
user_id = update.effective_user.id
|
||
# Only act when we're explicitly waiting for a custom-git URL
|
||
if not _CUSTOM_GIT_WAITERS.pop(user_id, False):
|
||
return
|
||
url = update.message.text.strip()
|
||
if not _validate_custom_git_url(url):
|
||
await update.message.reply_text(_t(user_id, "cg_invalid"), parse_mode="HTML")
|
||
return
|
||
await update.message.reply_text(_tf(user_id, "cg_cloning", html.escape(url)), parse_mode="HTML")
|
||
ok, tpl_id, info = await _download_custom_git_template(url)
|
||
if not ok:
|
||
await update.message.reply_text(_t(user_id, info), parse_mode="HTML")
|
||
return
|
||
# Success — record in GoTelegram config. Use "template_id" (canonical
|
||
# field name written by install.sh/save_gotelegram_config).
|
||
config = load_json(GOTELEGRAM_CONFIG) or {}
|
||
config["template_id"] = tpl_id
|
||
config["template_source"] = url
|
||
save_json(GOTELEGRAM_CONFIG, config)
|
||
await update.message.reply_text(
|
||
_tf(user_id, "cg_ok_fmt", html.escape(tpl_id)),
|
||
reply_markup=get_main_menu(user_id),
|
||
parse_mode="HTML",
|
||
)
|
||
|
||
|
||
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Log errors caused by Updates."""
|
||
logger.error(f"Exception while handling an update:", exc_info=context.error)
|
||
|
||
|
||
# ============================================================================
|
||
# MAIN APPLICATION
|
||
# ============================================================================
|
||
|
||
|
||
def main() -> None:
|
||
"""Start the bot."""
|
||
if not BOT_TOKEN:
|
||
logger.error("BOT_TOKEN not set in .env file")
|
||
return
|
||
|
||
# Create the Application
|
||
application = Application.builder().token(BOT_TOKEN).build()
|
||
|
||
# Command handlers
|
||
application.add_handler(CommandHandler("start", cmd_start))
|
||
application.add_handler(CommandHandler("help", cmd_help))
|
||
application.add_handler(CommandHandler("status", cmd_status))
|
||
application.add_handler(CommandHandler("logs", cmd_logs))
|
||
application.add_handler(CommandHandler("lang", cmd_lang))
|
||
application.add_handler(CommandHandler("addadmin", cmd_addadmin))
|
||
application.add_handler(CommandHandler("deladmin", cmd_deladmin))
|
||
|
||
# Callback query handler (buttons)
|
||
application.add_handler(CallbackQueryHandler(handle_callback))
|
||
|
||
# Text message handler (for custom git URL input)
|
||
application.add_handler(MessageHandler(
|
||
filters.TEXT & ~filters.COMMAND, handle_text_message
|
||
))
|
||
|
||
# Error handler
|
||
application.add_error_handler(error_handler)
|
||
|
||
# Run the bot
|
||
logger.info(f"GoTelegram v{GOTELEGRAM_VERSION} bot starting...")
|
||
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|