326 lines
17 KiB
Python
326 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
SwiftGram MTProxy — Telegram-бот для управления MTProxy на сервере.
|
||
Возможности: статус BBR, проверка IPv6, фикс звонков (UDP), управление контейнером.
|
||
Без рекламы и промокодов.
|
||
"""
|
||
|
||
import asyncio
|
||
import html
|
||
import json
|
||
import os
|
||
import re
|
||
from pathlib import Path
|
||
|
||
# ── Загрузка настроек из .env ────────────────────────────────────────────────
|
||
_env_path = Path(__file__).resolve().parent / ".env"
|
||
if not _env_path.exists():
|
||
_env_path = Path("/opt/swiftgram/.env")
|
||
|
||
if _env_path.exists():
|
||
with open(_env_path, encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith("#") and "=" in line:
|
||
k, v = line.split("=", 1)
|
||
os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))
|
||
|
||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||
from telegram.ext import (
|
||
Application,
|
||
CommandHandler,
|
||
CallbackQueryHandler,
|
||
ContextTypes,
|
||
MessageHandler,
|
||
filters,
|
||
)
|
||
|
||
# ── Конфигурация ─────────────────────────────────────────────────────────────
|
||
BOT_TOKEN = os.environ.get("BOT_TOKEN")
|
||
_allowed = os.environ.get("ALLOWED_IDS", "").strip()
|
||
try:
|
||
ALLOWED_IDS = set(int(x) for x in _allowed.split(",") if x.strip()) if _allowed else None
|
||
except ValueError:
|
||
ALLOWED_IDS = None
|
||
|
||
CONTAINER_NAME = os.environ.get("CONTAINER_NAME", "swiftgram-proxy")
|
||
CONFIG_FILE = Path(os.environ.get("CONFIG_PATH", "/opt/swiftgram/proxy.json"))
|
||
|
||
DOMAINS = [
|
||
"google.com", "wikipedia.org", "habr.com", "github.com",
|
||
"coursera.org", "udemy.com", "medium.com", "stackoverflow.com",
|
||
"bbc.com", "cnn.com", "reuters.com", "nytimes.com",
|
||
"lenta.ru", "rbc.ru", "ria.ru", "kommersant.ru",
|
||
"stepik.org", "duolingo.com", "khanacademy.org", "ted.com",
|
||
]
|
||
|
||
# ── Утилиты ──────────────────────────────────────────────────────────────────
|
||
def _ok(uid: int) -> bool:
|
||
return ALLOWED_IDS is None or uid in ALLOWED_IDS
|
||
|
||
def _decode(data: bytes) -> str:
|
||
return (data or b"").decode("utf-8", errors="replace").strip()
|
||
|
||
async def sh(*args: str, timeout: int = 60) -> tuple[int, str, str]:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
try:
|
||
out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||
except asyncio.TimeoutError:
|
||
proc.kill()
|
||
await proc.wait()
|
||
return -1, "", "Timeout"
|
||
return proc.returncode or 0, _decode(out), _decode(err)
|
||
|
||
async def get_ip4() -> str:
|
||
for url in ("https://api.ipify.org", "https://icanhazip.com", "https://ifconfig.me"):
|
||
code, out, _ = await sh("curl", "-s", "-4", "--max-time", "5", url, timeout=8)
|
||
if code == 0 and out:
|
||
m = re.search(r"(\d{1,3}\.){3}\d{1,3}", out)
|
||
if m: return m.group(0)
|
||
return "0.0.0.0"
|
||
|
||
async def get_ip6() -> str:
|
||
code, out, _ = await sh("curl", "-s", "-6", "--max-time", "5", "https://api6.ipify.org", timeout=8)
|
||
if code == 0 and out:
|
||
return out.strip()
|
||
return ""
|
||
|
||
async def check_bbr() -> bool:
|
||
code, out, _ = await sh("sysctl", "net.ipv4.tcp_congestion_control", timeout=5)
|
||
return "bbr" in out.lower()
|
||
|
||
async def proxy_running() -> bool:
|
||
code, out, _ = await sh("docker", "ps", "--format", "{{.Names}}", timeout=10)
|
||
return code == 0 and CONTAINER_NAME in out
|
||
|
||
async def docker_val(fmt: str) -> str:
|
||
code, out, _ = await sh("docker", "inspect", CONTAINER_NAME, "--format", fmt, timeout=10)
|
||
return out.strip() if code == 0 else ""
|
||
|
||
async def check_port(port: int) -> str | None:
|
||
if await proxy_running():
|
||
hp = await docker_val("{{range $p,$c := .HostConfig.PortBindings}}{{(index $c 0).HostPort}} {{end}}")
|
||
if str(port) in hp.split(): return None
|
||
code, out, _ = await sh("ss", "-tlnp", timeout=5)
|
||
if code != 0: code, out, _ = await sh("netstat", "-tlnp", timeout=5)
|
||
for line in out.splitlines():
|
||
if f":{port} " in line or f":{port}\t" in line: return line
|
||
return None
|
||
|
||
def save_config(data: dict) -> None:
|
||
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
CONFIG_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
def load_config() -> dict:
|
||
if CONFIG_FILE.exists():
|
||
try: return json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
|
||
except: pass
|
||
return {}
|
||
|
||
# ── Получение данных прокси ──────────────────────────────────────────────────
|
||
async def proxy_info() -> dict | None:
|
||
if not await proxy_running(): return None
|
||
cmd_str = await docker_val("{{range .Config.Cmd}}{{.}} {{end}}")
|
||
secret = cmd_str.split()[-1] if cmd_str else ""
|
||
hp = await docker_val("{{range $p,$c := .HostConfig.PortBindings}}{{(index $c 0).HostPort}}{{end}}")
|
||
port = hp or "443"
|
||
ip4 = await get_ip4()
|
||
ip6 = await get_ip6()
|
||
cfg = load_config()
|
||
return {
|
||
"ip4": ip4, "ip6": ip6, "port": port, "secret": secret,
|
||
"domain": cfg.get("domain", "—"),
|
||
"link4": f"tg://proxy?server={ip4}&port={port}&secret={secret}",
|
||
"link6": f"tg://proxy?server={ip6}&port={port}&secret={secret}" if ip6 else None
|
||
}
|
||
|
||
# ── Меню ─────────────────────────────────────────────────────────────────────
|
||
def main_menu_kb() -> InlineKeyboardMarkup:
|
||
return InlineKeyboardMarkup([
|
||
[InlineKeyboardButton("🔧 Установить / Обновить", callback_data="menu_install")],
|
||
[InlineKeyboardButton("📊 Статус", callback_data="menu_status"),
|
||
InlineKeyboardButton("🔗 Ссылка", callback_data="menu_link")],
|
||
[InlineKeyboardButton("📤 Поделиться", callback_data="menu_share")],
|
||
[InlineKeyboardButton("🔄 Рестарт", callback_data="menu_restart"),
|
||
InlineKeyboardButton("📋 Логи", callback_data="menu_logs")],
|
||
[InlineKeyboardButton("🗑 Удалить прокси", callback_data="menu_remove")],
|
||
])
|
||
|
||
HELP_TEXT = (
|
||
"🚀 <b>SwiftGram MTProxy Manager</b>\n\n"
|
||
"Управление прокси на сервере.\n"
|
||
"• TCP + UDP (звонки) активны\n"
|
||
"• IPv6 поддержка включена\n"
|
||
"• BBR оптимизация стека\n\n"
|
||
"Используйте кнопки ниже или команды:\n"
|
||
"/install /status /link /share /restart /logs /remove"
|
||
)
|
||
|
||
async def start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id):
|
||
return
|
||
msg = update.message or (update.callback_query and update.callback_query.message)
|
||
if update.message:
|
||
await update.message.reply_text(HELP_TEXT, parse_mode="HTML", reply_markup=main_menu_kb())
|
||
elif update.callback_query:
|
||
await update.callback_query.edit_message_text(HELP_TEXT, parse_mode="HTML", reply_markup=main_menu_kb())
|
||
|
||
# ── Команды ──────────────────────────────────────────────────────────────────
|
||
async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id): return
|
||
info = await proxy_info()
|
||
if not info:
|
||
text = "❌ <b>Прокси не запущен.</b>\nИспользуйте команду /install"
|
||
else:
|
||
bbr = "✅ Активен" if await check_bbr() else "❌ Выключен"
|
||
text = (
|
||
"✅ <b>SwiftGram работает</b>\n\n"
|
||
f"🌐 IPv4: <code>{info['ip4']}</code>\n"
|
||
f"🌐 IPv6: <code>{info['ip6'] or 'не найден'}</code>\n"
|
||
f"🔌 Порт: <code>{info['port']}</code>\n"
|
||
f"🎭 Домен: <code>{html.escape(info['domain'])}</code>\n"
|
||
f"🚀 BBR: <code>{bbr}</code>\n"
|
||
f"📞 Звонки: <code>✅ UDP открыт</code>\n\n"
|
||
f"Secret: <code>{info['secret']}</code>"
|
||
)
|
||
kb = InlineKeyboardMarkup([[InlineKeyboardButton("◀️ Меню", callback_data="menu_main")]])
|
||
if update.callback_query: await update.callback_query.edit_message_text(text, parse_mode="HTML", reply_markup=kb)
|
||
else: await update.message.reply_text(text, parse_mode="HTML", reply_markup=kb)
|
||
|
||
async def cmd_link(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id): return
|
||
info = await proxy_info()
|
||
if not info: text = "❌ Прокси не запущен."
|
||
else:
|
||
text = f"<b>Ваша ссылка:</b>\n<code>{info['link4']}</code>"
|
||
if info['link6']: text += f"\n\n<b>IPv6 ссылка:</b>\n<code>{info['link6']}</code>"
|
||
kb = InlineKeyboardMarkup([[InlineKeyboardButton("◀️ Меню", callback_data="menu_main")]])
|
||
if update.callback_query: await update.callback_query.edit_message_text(text, parse_mode="HTML", reply_markup=kb)
|
||
else: await update.message.reply_text(text, parse_mode="HTML", reply_markup=kb)
|
||
|
||
async def cmd_share(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id): return
|
||
info = await proxy_info()
|
||
if not info: return
|
||
share_text = (
|
||
f"🔐 <b>MTProxy для Telegram</b>\n\n"
|
||
f"🌍 Сервер: <code>{info['ip4']}</code>\n"
|
||
f"🔌 Порт: <code>{info['port']}</code>\n"
|
||
f"🔑 Secret: <code>{info['secret']}</code>\n\n"
|
||
f"👉 <b>Подключиться:</b>\n{info['link4']}"
|
||
)
|
||
kb = InlineKeyboardMarkup([
|
||
[InlineKeyboardButton("📤 Переслать другу", switch_inline_query=info['link4'])],
|
||
[InlineKeyboardButton("◀️ Меню", callback_data="menu_main")],
|
||
])
|
||
if update.callback_query: await update.callback_query.edit_message_text(share_text, parse_mode="HTML", reply_markup=kb)
|
||
else: await update.message.reply_text(share_text, parse_mode="HTML", reply_markup=kb)
|
||
|
||
async def cmd_restart(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id): return
|
||
if update.callback_query: await update.callback_query.answer("Перезапуск...")
|
||
code, _, err = await sh("docker", "restart", CONTAINER_NAME, timeout=30)
|
||
text = "✅ Контейнер перезапущен." if code == 0 else f"❌ Ошибка: {err}"
|
||
kb = InlineKeyboardMarkup([[InlineKeyboardButton("◀️ Меню", callback_data="menu_main")]])
|
||
if update.callback_query: await update.callback_query.edit_message_text(text, reply_markup=kb)
|
||
else: await update.message.reply_text(text, reply_markup=kb)
|
||
|
||
async def cmd_logs(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id): return
|
||
code, out, err = await sh("docker", "logs", "--tail", "30", CONTAINER_NAME, timeout=15)
|
||
text = f"<pre>{html.escape(out or err or 'Логов нет.')}</pre>"
|
||
kb = InlineKeyboardMarkup([[InlineKeyboardButton("◀️ Меню", callback_data="menu_main")]])
|
||
if update.callback_query: await update.callback_query.edit_message_text(text, parse_mode="HTML", reply_markup=kb)
|
||
else: await update.message.reply_text(text, parse_mode="HTML", reply_markup=kb)
|
||
|
||
async def cmd_remove(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id): return
|
||
if update.callback_query: await update.callback_query.edit_message_text("⏳ Удаление контейнера...")
|
||
await sh("docker", "stop", CONTAINER_NAME)
|
||
await sh("docker", "rm", CONTAINER_NAME)
|
||
text = "✅ Прокси успешно удален с сервера."
|
||
kb = InlineKeyboardMarkup([[InlineKeyboardButton("◀️ Меню", callback_data="menu_main")]])
|
||
if update.callback_query: await update.callback_query.edit_message_text(text, reply_markup=kb)
|
||
else: await update.message.reply_text(text, reply_markup=kb)
|
||
|
||
# ── Установка ────────────────────────────────────────────────────────────────
|
||
async def install_step_domain(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
if not update.effective_user or not _ok(update.effective_user.id): return
|
||
buttons = []
|
||
row = []
|
||
for i, d in enumerate(DOMAINS[:10]): # Показываем первые 10 для компактности
|
||
row.append(InlineKeyboardButton(d, callback_data=f"dom_{i}"))
|
||
if len(row) == 2: buttons.append(row); row = []
|
||
buttons.append([InlineKeyboardButton("◀️ Меню", callback_data="menu_main")])
|
||
text = "🌐 <b>Выберите домен маскировки (Fake TLS):</b>"
|
||
if update.callback_query: await update.callback_query.edit_message_text(text, parse_mode="HTML", reply_markup=InlineKeyboardMarkup(buttons))
|
||
else: await update.message.reply_text(text, parse_mode="HTML", reply_markup=InlineKeyboardMarkup(buttons))
|
||
|
||
async def do_install(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
domain = ctx.user_data.get("install_domain", "google.com")
|
||
# Бот делегирует установку системному вызову или запускает через docker напрямую
|
||
# Для безопасности в этой модульной версии мы просто дергаем docker run
|
||
port = "443"
|
||
if await check_port(443): port = "8443"
|
||
|
||
msg = update.callback_query.message if update.callback_query else update.message
|
||
await msg.reply_text(f"⏳ Начинаю установку SwiftGram на порт {port}...")
|
||
|
||
# Генерация секрета
|
||
_, s_out, _ = await sh("docker", "run", "--rm", "nineseconds/mtg:2", "generate-secret", "--hex", domain)
|
||
secret = s_out.strip().split()[-1] if s_out else ""
|
||
|
||
if not secret:
|
||
await msg.reply_text("❌ Ошибка генерации секрета.")
|
||
return
|
||
|
||
await sh("docker", "stop", CONTAINER_NAME)
|
||
await sh("docker", "rm", CONTAINER_NAME)
|
||
|
||
code, _, err = await sh(
|
||
"docker", "run", "-d", "--name", CONTAINER_NAME, "--restart", "always",
|
||
"-p", f"{port}:{port}/tcp", "-p", f"{port}:{port}/udp",
|
||
"nineseconds/mtg:2", "simple-run", "-n", "1.1.1.1", "-i", "prefer-ipv4",
|
||
f"0.0.0.0:{port}", secret
|
||
)
|
||
|
||
if code == 0:
|
||
save_config({"domain": domain, "port": port, "secret": secret})
|
||
await msg.reply_text(f"✅ <b>SwiftGram установлен!</b>\nПорт: {port}\nДомен: {domain}", parse_mode="HTML")
|
||
await cmd_status(update, ctx)
|
||
else:
|
||
await msg.reply_text(f"❌ Ошибка: {err}")
|
||
|
||
# ── Обработчики ──────────────────────────────────────────────────────────────
|
||
async def callback_handler(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||
query = update.callback_query
|
||
await query.answer()
|
||
data = query.data
|
||
if data == "menu_main": await start(update, ctx)
|
||
elif data == "menu_install": await install_step_domain(update, ctx)
|
||
elif data == "menu_status": await cmd_status(update, ctx)
|
||
elif data == "menu_link": await cmd_link(update, ctx)
|
||
elif data == "menu_share": await cmd_share(update, ctx)
|
||
elif data == "menu_restart": await cmd_restart(update, ctx)
|
||
elif data == "menu_logs": await cmd_logs(update, ctx)
|
||
elif data == "menu_remove": await cmd_remove(update, ctx)
|
||
elif data.startswith("dom_"):
|
||
idx = int(data[4:])
|
||
ctx.user_data["install_domain"] = DOMAINS[idx]
|
||
await do_install(update, ctx)
|
||
|
||
def main() -> None:
|
||
if not BOT_TOKEN: raise SystemExit("Задайте BOT_TOKEN в .env")
|
||
app = Application.builder().token(BOT_TOKEN).build()
|
||
app.add_handler(CommandHandler("start", start))
|
||
app.add_handler(CommandHandler("install", install_step_domain))
|
||
app.add_handler(CommandHandler("status", cmd_status))
|
||
app.add_handler(CommandHandler("restart", cmd_restart))
|
||
app.add_handler(CommandHandler("remove", cmd_remove))
|
||
app.add_handler(CallbackQueryHandler(callback_handler))
|
||
app.run_polling()
|
||
|
||
if __name__ == "__main__":
|
||
main() |