#!/usr/bin/env python3 """ GoTelegram local web admin. The service is intentionally bound to 127.0.0.1:1984. Operators reach it through an SSH tunnel; it must never be exposed directly on the public network. """ from __future__ import annotations import csv import hashlib import http.cookies import json import mimetypes import os import re import secrets import subprocess import time import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any ADMIN_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_DIR", "/opt/gotelegram-admin")) STATIC_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_STATIC", str(ADMIN_DIR / "static"))) TOKEN_FILE = Path(os.getenv("GOTELEGRAM_ADMIN_TOKEN_FILE", str(ADMIN_DIR / "token"))) GOTELEGRAM_CONFIG = Path(os.getenv("GOTELEGRAM_CONFIG", "/opt/gotelegram/config.json")) TELEMT_CONFIG = Path(os.getenv("TELEMT_CONFIG", "/etc/telemt/config.toml")) HISTORY_FILE = Path(os.getenv("GOTELEGRAM_STATS_HISTORY", "/opt/gotelegram/stats_history.csv")) CURRENT_STATS = Path(os.getenv("GOTELEGRAM_STATS_CURRENT", "/run/gotelegram/stats_current.json")) BACKUP_DIR = Path(os.getenv("GOTELEGRAM_BACKUP_DIR", "/opt/gotelegram/backups")) INSTALL_DIR = Path(os.getenv("GOTELEGRAM_DIR", "/opt/gotelegram")) HOST = os.getenv("GOTELEGRAM_ADMIN_HOST", "127.0.0.1") PORT = int(os.getenv("GOTELEGRAM_ADMIN_PORT", "1984")) VERSION = "2.5.0" USER_RE = re.compile(r"^[A-Za-z0-9_.-]{1,48}$") def utc_now() -> str: return datetime.now(timezone.utc).isoformat() def run(cmd: list[str], timeout: int = 8) -> tuple[int, str, str]: try: proc = subprocess.run( cmd, text=True, capture_output=True, timeout=timeout, check=False, ) return proc.returncode, proc.stdout, proc.stderr except Exception as exc: # pragma: no cover - system dependent return 125, "", str(exc) def ensure_token() -> str: ADMIN_DIR.mkdir(parents=True, exist_ok=True) if not TOKEN_FILE.exists() or TOKEN_FILE.stat().st_size < 24: TOKEN_FILE.write_text(secrets.token_urlsafe(36) + "\n", encoding="utf-8") os.chmod(TOKEN_FILE, 0o600) return TOKEN_FILE.read_text(encoding="utf-8").strip() def current_token() -> str: try: return ensure_token() except OSError: return "" def load_json(path: Path, fallback: Any = None) -> Any: try: with path.open("r", encoding="utf-8") as fh: return json.load(fh) except Exception: return fallback def read_telemt_users() -> dict[str, str]: if not TELEMT_CONFIG.exists(): return {} users: dict[str, str] = {} in_users = False for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines(): line = raw.strip() if line == "[access.users]": in_users = True continue if in_users and line.startswith("["): break if not in_users or not line or line.startswith("#") or "=" not in line: continue name, value = line.split("=", 1) name = name.strip() value = value.strip().split("#", 1)[0].strip() if value.startswith('"') and '"' in value[1:]: value = value[1:].split('"', 1)[0] elif value.startswith("'") and "'" in value[1:]: value = value[1:].split("'", 1)[0] if USER_RE.match(name) and value: users[name] = value return users def _ordered_user_lines(users: dict[str, str]) -> list[str]: names = [] if "main" in users: names.append("main") names.extend(sorted(n for n in users if n != "main")) return [f'{name} = "{users[name]}"' for name in names] def write_telemt_users(users: dict[str, str]) -> None: TELEMT_CONFIG.parent.mkdir(parents=True, exist_ok=True) lines = TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines() if TELEMT_CONFIG.exists() else [] rendered = _ordered_user_lines(users) out: list[str] = [] in_users = False found = False for raw in lines: if raw.strip() == "[access.users]": found = True in_users = True out.append(raw) out.extend(rendered) continue if in_users and raw.strip().startswith("["): in_users = False if in_users: continue out.append(raw) if not found: if out and out[-1].strip(): out.append("") out.append("[access.users]") out.extend(rendered) TELEMT_CONFIG.write_text("\n".join(out).rstrip() + "\n", encoding="utf-8") os.chmod(TELEMT_CONFIG, 0o600) def restart_service(name: str) -> bool: code, _, _ = run(["systemctl", "restart", name], timeout=25) return code == 0 def service_status(name: str) -> str: code, stdout, _ = run(["systemctl", "is-active", name], timeout=3) value = stdout.strip() if code == 0 and value == "active": return "running" code, stdout, _ = run(["systemctl", "list-unit-files", f"{name}.service", "--no-legend"], timeout=3) if code != 0 or not stdout.strip(): return "not_installed" if value in {"failed", "inactive", "activating", "deactivating"}: return value return "stopped" def public_ip() -> str: code, stdout, _ = run(["curl", "-s", "-4", "--max-time", "3", "https://api.ipify.org"], timeout=5) ip = stdout.strip() if code == 0 and re.match(r"^\d{1,3}(\.\d{1,3}){3}$", ip): return ip code, stdout, _ = run(["hostname", "-I"], timeout=3) return stdout.split()[0] if code == 0 and stdout.split() else "0.0.0.0" def proxy_link(secret: str) -> str: config = load_json(GOTELEGRAM_CONFIG, {}) or {} mode = str(config.get("mode", "lite")) port = int(config.get("port", 443) or 443) domain = str(config.get("domain", "") or "") mask_host = str(config.get("mask_host", "") or "") if mode == "pro" and domain: host_hex = domain.encode().hex() return f"tg://proxy?server={domain}&port={port}&secret=ee{secret}{host_hex}" server = public_ip() if mask_host: host_hex = mask_host.encode().hex() return f"tg://proxy?server={server}&port={port}&secret=ee{secret}{host_hex}" return f"tg://proxy?server={server}&port={port}&secret={secret}" def telemt_api(path: str) -> Any: try: with urllib.request.urlopen(f"http://127.0.0.1:9091{path}", timeout=1.8) as resp: payload = resp.read(256 * 1024) return json.loads(payload.decode("utf-8")) except Exception: return None def load_stats_history(limit: int = 240) -> list[dict[str, int]]: if not HISTORY_FILE.exists(): return [] rows: list[dict[str, int]] = [] try: with HISTORY_FILE.open("r", encoding="utf-8", newline="") as fh: for row in csv.DictReader(fh): try: rows.append({ "epoch": int(row.get("epoch") or 0), "proxy_bytes": int(row.get("proxy_bytes") or 0), "site_bytes": int(row.get("site_bytes") or 0), }) except ValueError: continue except OSError: return [] rows = rows[-limit:] previous = None enriched: list[dict[str, int]] = [] for row in rows: item = dict(row) if previous: item["proxy_delta"] = max(0, row["proxy_bytes"] - previous["proxy_bytes"]) item["site_delta"] = max(0, row["site_bytes"] - previous["site_bytes"]) else: item["proxy_delta"] = 0 item["site_delta"] = 0 enriched.append(item) previous = row return enriched def list_backups() -> list[dict[str, Any]]: if not BACKUP_DIR.exists(): return [] items = [] for path in sorted(BACKUP_DIR.glob("*.tar.gz*"), key=lambda p: p.stat().st_mtime, reverse=True): if path.name.endswith(".sha256"): continue try: st = path.stat() except OSError: continue items.append({ "name": path.name, "path": str(path), "size": st.st_size, "mtime": int(st.st_mtime), "encrypted": path.name.endswith(".enc"), }) return items[:30] def create_backup() -> tuple[bool, str]: script = ( "source /opt/gotelegram/lib/common.sh; " "source /opt/gotelegram/lib/i18n.sh; " "source /opt/gotelegram/lib/telemt.sh; " "source /opt/gotelegram/lib/website.sh; " "source /opt/gotelegram/lib/backup.sh; " "load_language \"$(detect_language 2>/dev/null || echo en)\"; " "create_backup \"\"" ) code, stdout, stderr = run(["bash", "-lc", script], timeout=180) text = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0] return code == 0, text def user_payload(name: str, secret: str, include_runtime: bool = False) -> dict[str, Any]: item: dict[str, Any] = { "name": name, "secret": secret, "link": proxy_link(secret), "main": name == "main", } if include_runtime: item["runtime"] = telemt_api(f"/v1/users/{urllib.parse.quote(name, safe='')}") return item def overview_payload() -> dict[str, Any]: config = load_json(GOTELEGRAM_CONFIG, {}) or {} users = read_telemt_users() current = load_json(CURRENT_STATS, {}) or {} summary = telemt_api("/v1/stats/summary") services = { "telemt": service_status("telemt"), "nginx": service_status("nginx"), "bot": service_status("gotelegram-bot"), "stats": service_status("gotelegram-stats"), "admin": service_status("gotelegram-admin"), } return { "version": VERSION, "time": utc_now(), "config": config, "users_count": len(users), "services": services, "stats_current": current, "stats_history": load_stats_history(), "runtime_summary": summary, "backups": list_backups(), } class AdminHandler(BaseHTTPRequestHandler): server_version = "GoTelegramAdmin/2.5.0" def log_message(self, fmt: str, *args: Any) -> None: print("%s - %s" % (self.address_string(), fmt % args)) def token_from_request(self) -> str: parsed = urllib.parse.urlparse(self.path) qs = urllib.parse.parse_qs(parsed.query) if qs.get("token", [""])[0]: return qs["token"][0] auth = self.headers.get("Authorization", "") if auth.startswith("Bearer "): return auth[len("Bearer "):].strip() cookie_header = self.headers.get("Cookie", "") if cookie_header: cookie = http.cookies.SimpleCookie() cookie.load(cookie_header) if "gtauth" in cookie: return cookie["gtauth"].value return "" def is_authorized(self) -> bool: token = current_token() candidate = self.token_from_request() return bool(token and candidate and secrets.compare_digest(token, candidate)) def send_json(self, payload: Any, status: int = 200) -> None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def send_error_json(self, status: int, message: str) -> None: self.send_json({"ok": False, "error": message}, status) def read_json_body(self) -> Any: length = int(self.headers.get("Content-Length", "0") or 0) if length > 1024 * 1024: raise ValueError("request body too large") if length <= 0: return {} return json.loads(self.rfile.read(length).decode("utf-8")) def require_auth(self) -> bool: if self.is_authorized(): return True self.send_error_json(401, "unauthorized") return False def require_write_guard(self) -> bool: if self.command in {"POST", "PUT", "PATCH", "DELETE"} and self.headers.get("X-GoTelegram-Admin") != "1": self.send_error_json(403, "missing write guard") return False return True def route_get_api(self, parsed: urllib.parse.ParseResult) -> None: if not self.require_auth(): return path = parsed.path if path == "/api/overview": self.send_json({"ok": True, "data": overview_payload()}) elif path == "/api/users": users = read_telemt_users() self.send_json({"ok": True, "data": [user_payload(k, v) for k, v in sorted(users.items())]}) elif path.startswith("/api/users/"): name = urllib.parse.unquote(path[len("/api/users/"):]) users = read_telemt_users() if name not in users: self.send_error_json(404, "user not found") return self.send_json({"ok": True, "data": user_payload(name, users[name], include_runtime=True)}) elif path == "/api/backups": self.send_json({"ok": True, "data": list_backups()}) elif path == "/api/logs": qs = urllib.parse.parse_qs(parsed.query) service = qs.get("service", ["telemt"])[0] allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats", "gotelegram-admin"} if service not in allowed: self.send_error_json(400, "unsupported service") return code, stdout, stderr = run(["journalctl", "-u", service, "-n", "120", "--no-pager"], timeout=8) self.send_json({"ok": code == 0, "data": stdout if code == 0 else stderr}) else: self.send_error_json(404, "not found") def route_post_api(self, parsed: urllib.parse.ParseResult) -> None: if not self.require_auth() or not self.require_write_guard(): return path = parsed.path try: body = self.read_json_body() except Exception as exc: self.send_error_json(400, str(exc)) return if path == "/api/users": name = str(body.get("name", "")).strip() if not USER_RE.match(name): self.send_error_json(400, "invalid user name") return users = read_telemt_users() if name in users: self.send_error_json(409, "user already exists") return seed = f"{name}:{time.time()}:{secrets.token_hex(32)}".encode() secret = hashlib.sha256(seed).hexdigest()[:32] users[name] = secret try: write_telemt_users(users) except Exception as exc: self.send_error_json(500, f"failed to save config: {exc}") return restarted = restart_service("telemt") self.send_json({"ok": True, "data": user_payload(name, secret), "restarted": restarted}) elif path == "/api/backups": ok, result = create_backup() self.send_json({"ok": ok, "data": {"path": result, "backups": list_backups()}}, 200 if ok else 500) elif path.startswith("/api/services/") and path.endswith("/restart"): service = path[len("/api/services/"):-len("/restart")] allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats"} if service not in allowed: self.send_error_json(400, "unsupported service") return ok = restart_service(service) self.send_json({"ok": ok, "status": service_status(service)}, 200 if ok else 500) else: self.send_error_json(404, "not found") def route_delete_api(self, parsed: urllib.parse.ParseResult) -> None: if not self.require_auth() or not self.require_write_guard(): return path = parsed.path if not path.startswith("/api/users/"): self.send_error_json(404, "not found") return name = urllib.parse.unquote(path[len("/api/users/"):]) if name == "main": self.send_error_json(400, "main user cannot be deleted") return users = read_telemt_users() if name not in users: self.send_error_json(404, "user not found") return users.pop(name, None) try: write_telemt_users(users) except Exception as exc: self.send_error_json(500, f"failed to save config: {exc}") return restarted = restart_service("telemt") self.send_json({"ok": True, "restarted": restarted}) def send_static(self, parsed: urllib.parse.ParseResult) -> None: qs = urllib.parse.parse_qs(parsed.query) if qs.get("token", [""])[0] and self.is_authorized(): self.send_response(302) self.send_header("Location", "/") self.send_header("Set-Cookie", "gtauth=%s; Path=/; HttpOnly; SameSite=Strict" % qs["token"][0]) self.send_header("Cache-Control", "no-store") self.end_headers() return rel = parsed.path.lstrip("/") or "index.html" if rel.startswith("api/") or ".." in rel.split("/"): self.send_error(404) return path = STATIC_DIR / rel if path.is_dir(): path = path / "index.html" if not path.exists(): path = STATIC_DIR / "index.html" try: body = path.read_bytes() except OSError: self.send_error(404) return mime = mimetypes.guess_type(str(path))[0] or "application/octet-stream" self.send_response(200) self.send_header("Content-Type", mime) self.send_header("Cache-Control", "no-store" if path.name == "index.html" else "public, max-age=3600") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path.startswith("/api/"): self.route_get_api(parsed) else: self.send_static(parsed) def do_POST(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path.startswith("/api/"): self.route_post_api(parsed) else: self.send_error(404) def do_DELETE(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path.startswith("/api/"): self.route_delete_api(parsed) else: self.send_error(404) def main() -> None: ensure_token() if not STATIC_DIR.exists(): raise SystemExit(f"static dir not found: {STATIC_DIR}") httpd = ThreadingHTTPServer((HOST, PORT), AdminHandler) print(f"GoTelegram admin listening on http://{HOST}:{PORT}") httpd.serve_forever() if __name__ == "__main__": main()