mirror of
https://github.com/anten-ka/gotelegram_pro.git
synced 2026-05-19 11:36:17 +00:00
571 lines
20 KiB
Python
571 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
GoTelegram local web admin.
|
|
|
|
The service is intentionally bound to 127.0.0.1:1984. Operators reach it
|
|
through an SSH tunnel; it must never be exposed directly on the public network.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import hashlib
|
|
import http.cookies
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import secrets
|
|
import socket
|
|
import subprocess
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ADMIN_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_DIR", "/opt/gotelegram-admin"))
|
|
STATIC_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_STATIC", str(ADMIN_DIR / "static")))
|
|
TOKEN_FILE = Path(os.getenv("GOTELEGRAM_ADMIN_TOKEN_FILE", str(ADMIN_DIR / "token")))
|
|
|
|
GOTELEGRAM_CONFIG = Path(os.getenv("GOTELEGRAM_CONFIG", "/opt/gotelegram/config.json"))
|
|
TELEMT_CONFIG = Path(os.getenv("TELEMT_CONFIG", "/etc/telemt/config.toml"))
|
|
HISTORY_FILE = Path(os.getenv("GOTELEGRAM_STATS_HISTORY", "/opt/gotelegram/stats_history.csv"))
|
|
CURRENT_STATS = Path(os.getenv("GOTELEGRAM_STATS_CURRENT", "/run/gotelegram/stats_current.json"))
|
|
BACKUP_DIR = Path(os.getenv("GOTELEGRAM_BACKUP_DIR", "/opt/gotelegram/backups"))
|
|
INSTALL_DIR = Path(os.getenv("GOTELEGRAM_DIR", "/opt/gotelegram"))
|
|
|
|
HOST = os.getenv("GOTELEGRAM_ADMIN_HOST", "127.0.0.1")
|
|
PORT = int(os.getenv("GOTELEGRAM_ADMIN_PORT", "1984"))
|
|
VERSION = "2.5.0"
|
|
USER_RE = re.compile(r"^[A-Za-z0-9_.-]{1,48}$")
|
|
|
|
|
|
def utc_now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def run(cmd: list[str], timeout: int = 8) -> tuple[int, str, str]:
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
check=False,
|
|
)
|
|
return proc.returncode, proc.stdout, proc.stderr
|
|
except Exception as exc: # pragma: no cover - system dependent
|
|
return 125, "", str(exc)
|
|
|
|
|
|
def ensure_token() -> str:
|
|
ADMIN_DIR.mkdir(parents=True, exist_ok=True)
|
|
if not TOKEN_FILE.exists() or TOKEN_FILE.stat().st_size < 24:
|
|
TOKEN_FILE.write_text(secrets.token_urlsafe(36) + "\n", encoding="utf-8")
|
|
os.chmod(TOKEN_FILE, 0o600)
|
|
return TOKEN_FILE.read_text(encoding="utf-8").strip()
|
|
|
|
|
|
def current_token() -> str:
|
|
try:
|
|
return ensure_token()
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def load_json(path: Path, fallback: Any = None) -> Any:
|
|
try:
|
|
with path.open("r", encoding="utf-8") as fh:
|
|
return json.load(fh)
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def read_telemt_users() -> dict[str, str]:
|
|
if not TELEMT_CONFIG.exists():
|
|
return {}
|
|
users: dict[str, str] = {}
|
|
in_users = False
|
|
for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
line = raw.strip()
|
|
if line == "[access.users]":
|
|
in_users = True
|
|
continue
|
|
if in_users and line.startswith("["):
|
|
break
|
|
if not in_users or not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
name, value = line.split("=", 1)
|
|
name = name.strip()
|
|
value = value.strip().split("#", 1)[0].strip()
|
|
if value.startswith('"') and '"' in value[1:]:
|
|
value = value[1:].split('"', 1)[0]
|
|
elif value.startswith("'") and "'" in value[1:]:
|
|
value = value[1:].split("'", 1)[0]
|
|
if USER_RE.match(name) and value:
|
|
users[name] = value
|
|
return users
|
|
|
|
|
|
def _ordered_user_lines(users: dict[str, str]) -> list[str]:
|
|
names = []
|
|
if "main" in users:
|
|
names.append("main")
|
|
names.extend(sorted(n for n in users if n != "main"))
|
|
return [f'{name} = "{users[name]}"' for name in names]
|
|
|
|
|
|
def write_telemt_users(users: dict[str, str]) -> None:
|
|
TELEMT_CONFIG.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines() if TELEMT_CONFIG.exists() else []
|
|
rendered = _ordered_user_lines(users)
|
|
out: list[str] = []
|
|
in_users = False
|
|
found = False
|
|
|
|
for raw in lines:
|
|
if raw.strip() == "[access.users]":
|
|
found = True
|
|
in_users = True
|
|
out.append(raw)
|
|
out.extend(rendered)
|
|
continue
|
|
if in_users and raw.strip().startswith("["):
|
|
in_users = False
|
|
if in_users:
|
|
continue
|
|
out.append(raw)
|
|
|
|
if not found:
|
|
if out and out[-1].strip():
|
|
out.append("")
|
|
out.append("[access.users]")
|
|
out.extend(rendered)
|
|
|
|
TELEMT_CONFIG.write_text("\n".join(out).rstrip() + "\n", encoding="utf-8")
|
|
os.chmod(TELEMT_CONFIG, 0o600)
|
|
|
|
|
|
def restart_service(name: str) -> bool:
|
|
code, _, _ = run(["systemctl", "restart", name], timeout=25)
|
|
if code != 0:
|
|
return False
|
|
if name == "telemt":
|
|
return wait_tcp_port(read_telemt_port(), timeout=90)
|
|
return True
|
|
|
|
|
|
def service_status(name: str) -> str:
|
|
code, stdout, _ = run(["systemctl", "is-active", name], timeout=3)
|
|
value = stdout.strip()
|
|
if code == 0 and value == "active":
|
|
return "running"
|
|
code, stdout, _ = run(["systemctl", "list-unit-files", f"{name}.service", "--no-legend"], timeout=3)
|
|
if code != 0 or not stdout.strip():
|
|
return "not_installed"
|
|
if value in {"failed", "inactive", "activating", "deactivating"}:
|
|
return value
|
|
return "stopped"
|
|
|
|
|
|
def read_telemt_port() -> int:
|
|
if not TELEMT_CONFIG.exists():
|
|
return 443
|
|
in_server = False
|
|
for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
line = raw.strip()
|
|
if line == "[server]":
|
|
in_server = True
|
|
continue
|
|
if in_server and line.startswith("["):
|
|
break
|
|
if in_server and line.startswith("port") and "=" in line:
|
|
try:
|
|
return int(line.split("=", 1)[1].strip().split("#", 1)[0])
|
|
except ValueError:
|
|
return 443
|
|
return 443
|
|
|
|
|
|
def wait_tcp_port(port: int, timeout: int = 90) -> bool:
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
if service_status("telemt") not in {"running", "activating"}:
|
|
return False
|
|
try:
|
|
with socket.create_connection(("127.0.0.1", port), timeout=0.6):
|
|
return True
|
|
except OSError:
|
|
time.sleep(1)
|
|
return False
|
|
|
|
|
|
def public_ip() -> str:
|
|
code, stdout, _ = run(["curl", "-s", "-4", "--max-time", "3", "https://api.ipify.org"], timeout=5)
|
|
ip = stdout.strip()
|
|
if code == 0 and re.match(r"^\d{1,3}(\.\d{1,3}){3}$", ip):
|
|
return ip
|
|
code, stdout, _ = run(["hostname", "-I"], timeout=3)
|
|
return stdout.split()[0] if code == 0 and stdout.split() else "0.0.0.0"
|
|
|
|
|
|
def proxy_link(secret: str) -> str:
|
|
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
|
|
mode = str(config.get("mode", "lite"))
|
|
port = int(config.get("port", 443) or 443)
|
|
domain = str(config.get("domain", "") or "")
|
|
mask_host = str(config.get("mask_host", "") or "")
|
|
|
|
if mode == "pro" and domain:
|
|
host_hex = domain.encode().hex()
|
|
return f"tg://proxy?server={domain}&port={port}&secret=ee{secret}{host_hex}"
|
|
|
|
server = public_ip()
|
|
if mask_host:
|
|
host_hex = mask_host.encode().hex()
|
|
return f"tg://proxy?server={server}&port={port}&secret=ee{secret}{host_hex}"
|
|
return f"tg://proxy?server={server}&port={port}&secret={secret}"
|
|
|
|
|
|
def telemt_api(path: str) -> Any:
|
|
try:
|
|
with urllib.request.urlopen(f"http://127.0.0.1:9091{path}", timeout=1.8) as resp:
|
|
payload = resp.read(256 * 1024)
|
|
return json.loads(payload.decode("utf-8"))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def load_stats_history(limit: int = 240) -> list[dict[str, int]]:
|
|
if not HISTORY_FILE.exists():
|
|
return []
|
|
rows: list[dict[str, int]] = []
|
|
try:
|
|
with HISTORY_FILE.open("r", encoding="utf-8", newline="") as fh:
|
|
for row in csv.DictReader(fh):
|
|
try:
|
|
rows.append({
|
|
"epoch": int(row.get("epoch") or 0),
|
|
"proxy_bytes": int(row.get("proxy_bytes") or 0),
|
|
"site_bytes": int(row.get("site_bytes") or 0),
|
|
})
|
|
except ValueError:
|
|
continue
|
|
except OSError:
|
|
return []
|
|
rows = rows[-limit:]
|
|
previous = None
|
|
enriched: list[dict[str, int]] = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
if previous:
|
|
item["proxy_delta"] = max(0, row["proxy_bytes"] - previous["proxy_bytes"])
|
|
item["site_delta"] = max(0, row["site_bytes"] - previous["site_bytes"])
|
|
else:
|
|
item["proxy_delta"] = 0
|
|
item["site_delta"] = 0
|
|
enriched.append(item)
|
|
previous = row
|
|
return enriched
|
|
|
|
|
|
def list_backups() -> list[dict[str, Any]]:
|
|
if not BACKUP_DIR.exists():
|
|
return []
|
|
items = []
|
|
for path in sorted(BACKUP_DIR.glob("*.tar.gz*"), key=lambda p: p.stat().st_mtime, reverse=True):
|
|
if path.name.endswith(".sha256"):
|
|
continue
|
|
try:
|
|
st = path.stat()
|
|
except OSError:
|
|
continue
|
|
items.append({
|
|
"name": path.name,
|
|
"path": str(path),
|
|
"size": st.st_size,
|
|
"mtime": int(st.st_mtime),
|
|
"encrypted": path.name.endswith(".enc"),
|
|
})
|
|
return items[:30]
|
|
|
|
|
|
def create_backup() -> tuple[bool, str]:
|
|
script = (
|
|
"source /opt/gotelegram/lib/common.sh; "
|
|
"source /opt/gotelegram/lib/i18n.sh; "
|
|
"source /opt/gotelegram/lib/telemt.sh; "
|
|
"source /opt/gotelegram/lib/website.sh; "
|
|
"source /opt/gotelegram/lib/backup.sh; "
|
|
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
|
|
"create_backup \"\""
|
|
)
|
|
code, stdout, stderr = run(["bash", "-lc", script], timeout=180)
|
|
text = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0]
|
|
return code == 0, text
|
|
|
|
|
|
def user_payload(name: str, secret: str, include_runtime: bool = False) -> dict[str, Any]:
|
|
item: dict[str, Any] = {
|
|
"name": name,
|
|
"secret": secret,
|
|
"link": proxy_link(secret),
|
|
"main": name == "main",
|
|
}
|
|
if include_runtime:
|
|
item["runtime"] = telemt_api(f"/v1/users/{urllib.parse.quote(name, safe='')}")
|
|
return item
|
|
|
|
|
|
def overview_payload() -> dict[str, Any]:
|
|
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
|
|
users = read_telemt_users()
|
|
current = load_json(CURRENT_STATS, {}) or {}
|
|
summary = telemt_api("/v1/stats/summary")
|
|
services = {
|
|
"telemt": service_status("telemt"),
|
|
"nginx": service_status("nginx"),
|
|
"bot": service_status("gotelegram-bot"),
|
|
"stats": service_status("gotelegram-stats"),
|
|
"admin": service_status("gotelegram-admin"),
|
|
}
|
|
return {
|
|
"version": VERSION,
|
|
"time": utc_now(),
|
|
"config": config,
|
|
"users_count": len(users),
|
|
"services": services,
|
|
"stats_current": current,
|
|
"stats_history": load_stats_history(),
|
|
"runtime_summary": summary,
|
|
"backups": list_backups(),
|
|
}
|
|
|
|
|
|
class AdminHandler(BaseHTTPRequestHandler):
|
|
server_version = "GoTelegramAdmin/2.5.0"
|
|
|
|
def log_message(self, fmt: str, *args: Any) -> None:
|
|
print("%s - %s" % (self.address_string(), fmt % args))
|
|
|
|
def token_from_request(self) -> str:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
qs = urllib.parse.parse_qs(parsed.query)
|
|
if qs.get("token", [""])[0]:
|
|
return qs["token"][0]
|
|
auth = self.headers.get("Authorization", "")
|
|
if auth.startswith("Bearer "):
|
|
return auth[len("Bearer "):].strip()
|
|
cookie_header = self.headers.get("Cookie", "")
|
|
if cookie_header:
|
|
cookie = http.cookies.SimpleCookie()
|
|
cookie.load(cookie_header)
|
|
if "gtauth" in cookie:
|
|
return cookie["gtauth"].value
|
|
return ""
|
|
|
|
def is_authorized(self) -> bool:
|
|
token = current_token()
|
|
candidate = self.token_from_request()
|
|
return bool(token and candidate and secrets.compare_digest(token, candidate))
|
|
|
|
def send_json(self, payload: Any, status: int = 200) -> None:
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def send_error_json(self, status: int, message: str) -> None:
|
|
self.send_json({"ok": False, "error": message}, status)
|
|
|
|
def read_json_body(self) -> Any:
|
|
length = int(self.headers.get("Content-Length", "0") or 0)
|
|
if length > 1024 * 1024:
|
|
raise ValueError("request body too large")
|
|
if length <= 0:
|
|
return {}
|
|
return json.loads(self.rfile.read(length).decode("utf-8"))
|
|
|
|
def require_auth(self) -> bool:
|
|
if self.is_authorized():
|
|
return True
|
|
self.send_error_json(401, "unauthorized")
|
|
return False
|
|
|
|
def require_write_guard(self) -> bool:
|
|
if self.command in {"POST", "PUT", "PATCH", "DELETE"} and self.headers.get("X-GoTelegram-Admin") != "1":
|
|
self.send_error_json(403, "missing write guard")
|
|
return False
|
|
return True
|
|
|
|
def route_get_api(self, parsed: urllib.parse.ParseResult) -> None:
|
|
if not self.require_auth():
|
|
return
|
|
path = parsed.path
|
|
if path == "/api/overview":
|
|
self.send_json({"ok": True, "data": overview_payload()})
|
|
elif path == "/api/users":
|
|
users = read_telemt_users()
|
|
self.send_json({"ok": True, "data": [user_payload(k, v) for k, v in sorted(users.items())]})
|
|
elif path.startswith("/api/users/"):
|
|
name = urllib.parse.unquote(path[len("/api/users/"):])
|
|
users = read_telemt_users()
|
|
if name not in users:
|
|
self.send_error_json(404, "user not found")
|
|
return
|
|
self.send_json({"ok": True, "data": user_payload(name, users[name], include_runtime=True)})
|
|
elif path == "/api/backups":
|
|
self.send_json({"ok": True, "data": list_backups()})
|
|
elif path == "/api/logs":
|
|
qs = urllib.parse.parse_qs(parsed.query)
|
|
service = qs.get("service", ["telemt"])[0]
|
|
allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats", "gotelegram-admin"}
|
|
if service not in allowed:
|
|
self.send_error_json(400, "unsupported service")
|
|
return
|
|
code, stdout, stderr = run(["journalctl", "-u", service, "-n", "120", "--no-pager"], timeout=8)
|
|
self.send_json({"ok": code == 0, "data": stdout if code == 0 else stderr})
|
|
else:
|
|
self.send_error_json(404, "not found")
|
|
|
|
def route_post_api(self, parsed: urllib.parse.ParseResult) -> None:
|
|
if not self.require_auth() or not self.require_write_guard():
|
|
return
|
|
path = parsed.path
|
|
try:
|
|
body = self.read_json_body()
|
|
except Exception as exc:
|
|
self.send_error_json(400, str(exc))
|
|
return
|
|
|
|
if path == "/api/users":
|
|
name = str(body.get("name", "")).strip()
|
|
if not USER_RE.match(name):
|
|
self.send_error_json(400, "invalid user name")
|
|
return
|
|
users = read_telemt_users()
|
|
if name in users:
|
|
self.send_error_json(409, "user already exists")
|
|
return
|
|
seed = f"{name}:{time.time()}:{secrets.token_hex(32)}".encode()
|
|
secret = hashlib.sha256(seed).hexdigest()[:32]
|
|
users[name] = secret
|
|
try:
|
|
write_telemt_users(users)
|
|
except Exception as exc:
|
|
self.send_error_json(500, f"failed to save config: {exc}")
|
|
return
|
|
restarted = restart_service("telemt")
|
|
self.send_json({"ok": True, "data": user_payload(name, secret), "restarted": restarted})
|
|
elif path == "/api/backups":
|
|
ok, result = create_backup()
|
|
self.send_json({"ok": ok, "data": {"path": result, "backups": list_backups()}}, 200 if ok else 500)
|
|
elif path.startswith("/api/services/") and path.endswith("/restart"):
|
|
service = path[len("/api/services/"):-len("/restart")]
|
|
allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats"}
|
|
if service not in allowed:
|
|
self.send_error_json(400, "unsupported service")
|
|
return
|
|
ok = restart_service(service)
|
|
self.send_json({"ok": ok, "status": service_status(service)}, 200 if ok else 500)
|
|
else:
|
|
self.send_error_json(404, "not found")
|
|
|
|
def route_delete_api(self, parsed: urllib.parse.ParseResult) -> None:
|
|
if not self.require_auth() or not self.require_write_guard():
|
|
return
|
|
path = parsed.path
|
|
if not path.startswith("/api/users/"):
|
|
self.send_error_json(404, "not found")
|
|
return
|
|
name = urllib.parse.unquote(path[len("/api/users/"):])
|
|
if name == "main":
|
|
self.send_error_json(400, "main user cannot be deleted")
|
|
return
|
|
users = read_telemt_users()
|
|
if name not in users:
|
|
self.send_error_json(404, "user not found")
|
|
return
|
|
users.pop(name, None)
|
|
try:
|
|
write_telemt_users(users)
|
|
except Exception as exc:
|
|
self.send_error_json(500, f"failed to save config: {exc}")
|
|
return
|
|
restarted = restart_service("telemt")
|
|
self.send_json({"ok": True, "restarted": restarted})
|
|
|
|
def send_static(self, parsed: urllib.parse.ParseResult) -> None:
|
|
qs = urllib.parse.parse_qs(parsed.query)
|
|
if qs.get("token", [""])[0] and self.is_authorized():
|
|
self.send_response(302)
|
|
self.send_header("Location", "/")
|
|
self.send_header("Set-Cookie", "gtauth=%s; Path=/; HttpOnly; SameSite=Strict" % qs["token"][0])
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
return
|
|
|
|
rel = parsed.path.lstrip("/") or "index.html"
|
|
if rel.startswith("api/") or ".." in rel.split("/"):
|
|
self.send_error(404)
|
|
return
|
|
path = STATIC_DIR / rel
|
|
if path.is_dir():
|
|
path = path / "index.html"
|
|
if not path.exists():
|
|
path = STATIC_DIR / "index.html"
|
|
try:
|
|
body = path.read_bytes()
|
|
except OSError:
|
|
self.send_error(404)
|
|
return
|
|
mime = mimetypes.guess_type(str(path))[0] or "application/octet-stream"
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", mime)
|
|
self.send_header("Cache-Control", "no-store" if path.name == "index.html" else "public, max-age=3600")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self) -> None:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path.startswith("/api/"):
|
|
self.route_get_api(parsed)
|
|
else:
|
|
self.send_static(parsed)
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path.startswith("/api/"):
|
|
self.route_post_api(parsed)
|
|
else:
|
|
self.send_error(404)
|
|
|
|
def do_DELETE(self) -> None:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path.startswith("/api/"):
|
|
self.route_delete_api(parsed)
|
|
else:
|
|
self.send_error(404)
|
|
|
|
|
|
def main() -> None:
|
|
ensure_token()
|
|
if not STATIC_DIR.exists():
|
|
raise SystemExit(f"static dir not found: {STATIC_DIR}")
|
|
httpd = ThreadingHTTPServer((HOST, PORT), AdminHandler)
|
|
print(f"GoTelegram admin listening on http://{HOST}:{PORT}")
|
|
httpd.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|