Files
gotelegram_pro/admin-web/server.py
2026-04-24 22:30:09 +03:00

612 lines
22 KiB
Python

#!/usr/bin/env python3
"""
GoTelegram local web admin.
The service is intentionally bound to 127.0.0.1:1984. Operators reach it
through an SSH tunnel; it must never be exposed directly on the public network.
"""
from __future__ import annotations
import csv
import hashlib
import json
import mimetypes
import os
import re
import secrets
import socket
import subprocess
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
ADMIN_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_DIR", "/opt/gotelegram-admin"))
STATIC_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_STATIC", str(ADMIN_DIR / "static")))
GOTELEGRAM_CONFIG = Path(os.getenv("GOTELEGRAM_CONFIG", "/opt/gotelegram/config.json"))
TELEMT_CONFIG = Path(os.getenv("TELEMT_CONFIG", "/etc/telemt/config.toml"))
HISTORY_FILE = Path(os.getenv("GOTELEGRAM_STATS_HISTORY", "/opt/gotelegram/stats_history.csv"))
CURRENT_STATS = Path(os.getenv("GOTELEGRAM_STATS_CURRENT", "/run/gotelegram/stats_current.json"))
BACKUP_DIR = Path(os.getenv("GOTELEGRAM_BACKUP_DIR", "/opt/gotelegram/backups"))
INSTALL_DIR = Path(os.getenv("GOTELEGRAM_DIR", "/opt/gotelegram"))
HOST = os.getenv("GOTELEGRAM_ADMIN_HOST", "127.0.0.1")
PORT = int(os.getenv("GOTELEGRAM_ADMIN_PORT", "1984"))
VERSION = "2.5.0"
USER_RE = re.compile(r"^[A-Za-z0-9_.-]{1,48}$")
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def run(cmd: list[str], timeout: int = 8) -> tuple[int, str, str]:
try:
proc = subprocess.run(
cmd,
text=True,
capture_output=True,
timeout=timeout,
check=False,
)
return proc.returncode, proc.stdout, proc.stderr
except Exception as exc: # pragma: no cover - system dependent
return 125, "", str(exc)
def load_json(path: Path, fallback: Any = None) -> Any:
try:
with path.open("r", encoding="utf-8") as fh:
return json.load(fh)
except Exception:
return fallback
def read_language(config: dict[str, Any] | None = None) -> str:
config = config or load_json(GOTELEGRAM_CONFIG, {}) or {}
lang = str(config.get("language") or config.get("lang") or "").strip().lower()
marker = INSTALL_DIR / ".language"
if lang not in {"en", "ru"} and marker.exists():
try:
lang = marker.read_text(encoding="utf-8", errors="ignore").strip().lower()[:2]
except OSError:
lang = ""
return lang if lang in {"en", "ru"} else "en"
def read_telemt_users() -> dict[str, str]:
if not TELEMT_CONFIG.exists():
return {}
users: dict[str, str] = {}
in_users = False
for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines():
line = raw.strip()
if line == "[access.users]":
in_users = True
continue
if in_users and line.startswith("["):
break
if not in_users or not line or line.startswith("#") or "=" not in line:
continue
name, value = line.split("=", 1)
name = name.strip()
value = value.strip().split("#", 1)[0].strip()
if value.startswith('"') and '"' in value[1:]:
value = value[1:].split('"', 1)[0]
elif value.startswith("'") and "'" in value[1:]:
value = value[1:].split("'", 1)[0]
if USER_RE.match(name) and value:
users[name] = value
return users
def _ordered_user_lines(users: dict[str, str]) -> list[str]:
names = []
if "main" in users:
names.append("main")
names.extend(sorted(n for n in users if n != "main"))
return [f'{name} = "{users[name]}"' for name in names]
def write_telemt_users(users: dict[str, str]) -> None:
TELEMT_CONFIG.parent.mkdir(parents=True, exist_ok=True)
lines = TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines() if TELEMT_CONFIG.exists() else []
rendered = _ordered_user_lines(users)
out: list[str] = []
in_users = False
found = False
for raw in lines:
if raw.strip() == "[access.users]":
found = True
in_users = True
out.append(raw)
out.extend(rendered)
continue
if in_users and raw.strip().startswith("["):
in_users = False
if in_users:
continue
out.append(raw)
if not found:
if out and out[-1].strip():
out.append("")
out.append("[access.users]")
out.extend(rendered)
TELEMT_CONFIG.write_text("\n".join(out).rstrip() + "\n", encoding="utf-8")
os.chmod(TELEMT_CONFIG, 0o600)
def restart_service(name: str) -> bool:
code, _, _ = run(["systemctl", "restart", name], timeout=25)
if code != 0:
return False
if name == "telemt":
return wait_tcp_port(read_telemt_port(), timeout=90)
return True
def service_status(name: str) -> str:
code, stdout, _ = run(["systemctl", "is-active", name], timeout=3)
value = stdout.strip()
if code == 0 and value == "active":
return "running"
code, stdout, _ = run(["systemctl", "list-unit-files", f"{name}.service", "--no-legend"], timeout=3)
if code != 0 or not stdout.strip():
return "not_installed"
if value in {"failed", "inactive", "activating", "deactivating"}:
return value
return "stopped"
def read_telemt_port() -> int:
if not TELEMT_CONFIG.exists():
return 443
in_server = False
for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines():
line = raw.strip()
if line == "[server]":
in_server = True
continue
if in_server and line.startswith("["):
break
if in_server and line.startswith("port") and "=" in line:
try:
return int(line.split("=", 1)[1].strip().split("#", 1)[0])
except ValueError:
return 443
return 443
def wait_tcp_port(port: int, timeout: int = 90) -> bool:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if service_status("telemt") not in {"running", "activating"}:
return False
try:
with socket.create_connection(("127.0.0.1", port), timeout=0.6):
return True
except OSError:
time.sleep(1)
return False
def public_ip() -> str:
code, stdout, _ = run(["curl", "-s", "-4", "--max-time", "3", "https://api.ipify.org"], timeout=5)
ip = stdout.strip()
if code == 0 and re.match(r"^\d{1,3}(\.\d{1,3}){3}$", ip):
return ip
code, stdout, _ = run(["hostname", "-I"], timeout=3)
return stdout.split()[0] if code == 0 and stdout.split() else "0.0.0.0"
def proxy_link(secret: str) -> str:
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
mode = str(config.get("mode", "lite"))
port = int(config.get("port", 443) or 443)
domain = str(config.get("domain", "") or "")
mask_host = str(config.get("mask_host", "") or "")
if mode == "pro" and domain:
host_hex = domain.encode().hex()
return f"tg://proxy?server={domain}&port={port}&secret=ee{secret}{host_hex}"
server = public_ip()
if mask_host:
host_hex = mask_host.encode().hex()
return f"tg://proxy?server={server}&port={port}&secret=ee{secret}{host_hex}"
return f"tg://proxy?server={server}&port={port}&secret={secret}"
def telemt_api(path: str) -> Any:
try:
with urllib.request.urlopen(f"http://127.0.0.1:9091{path}", timeout=1.8) as resp:
payload = resp.read(256 * 1024)
return json.loads(payload.decode("utf-8"))
except Exception:
return None
def load_stats_history(limit: int = 240) -> list[dict[str, int]]:
if not HISTORY_FILE.exists():
return []
rows: list[dict[str, int]] = []
try:
with HISTORY_FILE.open("r", encoding="utf-8", newline="") as fh:
for row in csv.DictReader(fh):
try:
rows.append({
"epoch": int(row.get("epoch") or 0),
"proxy_bytes": int(row.get("proxy_bytes") or 0),
"site_bytes": int(row.get("site_bytes") or 0),
})
except ValueError:
continue
except OSError:
return []
rows = rows[-limit:]
previous = None
enriched: list[dict[str, int]] = []
for row in rows:
item = dict(row)
if previous:
item["proxy_delta"] = max(0, row["proxy_bytes"] - previous["proxy_bytes"])
item["site_delta"] = max(0, row["site_bytes"] - previous["site_bytes"])
else:
item["proxy_delta"] = 0
item["site_delta"] = 0
enriched.append(item)
previous = row
return enriched
def count_history_rows() -> int:
if not HISTORY_FILE.exists():
return 0
try:
with HISTORY_FILE.open("r", encoding="utf-8", errors="ignore") as fh:
return sum(1 for line in fh if line and line[0].isdigit())
except OSError:
return 0
def stats_status(current: dict[str, Any] | None = None, history: list[dict[str, int]] | None = None) -> dict[str, Any]:
current = current if current is not None else (load_json(CURRENT_STATS, {}) or {})
history = history if history is not None else load_stats_history(limit=2)
service = service_status("gotelegram-stats")
now = int(time.time())
ts = int(current.get("ts") or 0) if isinstance(current, dict) else 0
age = max(0, now - ts) if ts else None
error = str(current.get("error") or "") if isinstance(current, dict) else ""
history_rows = count_history_rows()
if error:
health = "error"
elif service == "running" and current and age is not None and age <= 180:
health = "ok"
elif service == "running":
health = "stale"
elif service == "not_installed":
health = "not_installed"
else:
health = "stopped"
return {
"health": health,
"service": service,
"current_exists": CURRENT_STATS.exists(),
"history_exists": HISTORY_FILE.exists(),
"history_rows": history_rows,
"history_points": len(history or []),
"last_ts": ts,
"age_seconds": age,
"error": error,
}
def run_stats_action(action: str) -> tuple[bool, str, dict[str, Any]]:
if action == "repair":
body = (
"source /opt/gotelegram/lib/common.sh; "
"source /opt/gotelegram/lib/i18n.sh; "
"source /opt/gotelegram/lib/stats.sh; "
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
"install_stats_collector; "
"stats_collect"
)
timeout = 180
else:
body = (
"source /opt/gotelegram/lib/common.sh; "
"source /opt/gotelegram/lib/stats.sh; "
"stats_init >/dev/null 2>&1 || true; "
"stats_collect"
)
timeout = 30
code, stdout, stderr = run(["bash", "-lc", body], timeout=timeout)
message = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0]
current = load_json(CURRENT_STATS, {}) or {}
history = load_stats_history()
return code == 0, message, {"current": current, "history": history, "status": stats_status(current, history)}
def list_backups() -> list[dict[str, Any]]:
if not BACKUP_DIR.exists():
return []
items = []
for path in sorted(BACKUP_DIR.glob("*.tar.gz*"), key=lambda p: p.stat().st_mtime, reverse=True):
if path.name.endswith(".sha256"):
continue
try:
st = path.stat()
except OSError:
continue
items.append({
"name": path.name,
"path": str(path),
"size": st.st_size,
"mtime": int(st.st_mtime),
"encrypted": path.name.endswith(".enc"),
})
return items[:30]
def create_backup() -> tuple[bool, str]:
script = (
"source /opt/gotelegram/lib/common.sh; "
"source /opt/gotelegram/lib/i18n.sh; "
"source /opt/gotelegram/lib/telemt.sh; "
"source /opt/gotelegram/lib/website.sh; "
"source /opt/gotelegram/lib/backup.sh; "
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
"create_backup \"\""
)
code, stdout, stderr = run(["bash", "-lc", script], timeout=180)
text = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0]
return code == 0, text
def user_payload(name: str, secret: str, include_runtime: bool = False) -> dict[str, Any]:
item: dict[str, Any] = {
"name": name,
"secret": secret,
"link": proxy_link(secret),
"main": name == "main",
}
if include_runtime:
item["runtime"] = telemt_api(f"/v1/users/{urllib.parse.quote(name, safe='')}")
return item
def overview_payload() -> dict[str, Any]:
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
language = read_language(config)
users = read_telemt_users()
current = load_json(CURRENT_STATS, {}) or {}
history = load_stats_history()
summary = telemt_api("/v1/stats/summary")
services = {
"telemt": service_status("telemt"),
"nginx": service_status("nginx"),
"bot": service_status("gotelegram-bot"),
"stats": service_status("gotelegram-stats"),
"admin": service_status("gotelegram-admin"),
}
return {
"version": VERSION,
"time": utc_now(),
"language": language,
"admin_bind": {"host": HOST, "port": PORT},
"config": config,
"users_count": len(users),
"services": services,
"stats_current": current,
"stats_history": history,
"stats_status": stats_status(current, history),
"runtime_summary": summary,
"backups": list_backups(),
}
class AdminHandler(BaseHTTPRequestHandler):
server_version = "GoTelegramAdmin/2.5.0"
def log_message(self, fmt: str, *args: Any) -> None:
print("%s - %s" % (self.address_string(), fmt % args))
def send_json(self, payload: Any, status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def send_error_json(self, status: int, message: str) -> None:
self.send_json({"ok": False, "error": message}, status)
def read_json_body(self) -> Any:
length = int(self.headers.get("Content-Length", "0") or 0)
if length > 1024 * 1024:
raise ValueError("request body too large")
if length <= 0:
return {}
return json.loads(self.rfile.read(length).decode("utf-8"))
def require_write_guard(self) -> bool:
if self.command in {"POST", "PUT", "PATCH", "DELETE"} and self.headers.get("X-GoTelegram-Admin") != "1":
self.send_error_json(403, "missing write guard")
return False
return True
def route_get_api(self, parsed: urllib.parse.ParseResult) -> None:
path = parsed.path
if path == "/api/overview":
self.send_json({"ok": True, "data": overview_payload()})
elif path == "/api/users":
users = read_telemt_users()
self.send_json({"ok": True, "data": [user_payload(k, v) for k, v in sorted(users.items())]})
elif path.startswith("/api/users/"):
name = urllib.parse.unquote(path[len("/api/users/"):])
users = read_telemt_users()
if name not in users:
self.send_error_json(404, "user not found")
return
self.send_json({"ok": True, "data": user_payload(name, users[name], include_runtime=True)})
elif path == "/api/backups":
self.send_json({"ok": True, "data": list_backups()})
elif path == "/api/stats":
current = load_json(CURRENT_STATS, {}) or {}
history = load_stats_history()
self.send_json({"ok": True, "data": {"current": current, "history": history, "status": stats_status(current, history)}})
elif path == "/api/logs":
qs = urllib.parse.parse_qs(parsed.query)
service = qs.get("service", ["telemt"])[0]
allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats", "gotelegram-admin"}
if service not in allowed:
self.send_error_json(400, "unsupported service")
return
code, stdout, stderr = run(["journalctl", "-u", service, "-n", "120", "--no-pager"], timeout=8)
self.send_json({"ok": code == 0, "data": stdout if code == 0 else stderr})
else:
self.send_error_json(404, "not found")
def route_post_api(self, parsed: urllib.parse.ParseResult) -> None:
if not self.require_write_guard():
return
path = parsed.path
try:
body = self.read_json_body()
except Exception as exc:
self.send_error_json(400, str(exc))
return
if path == "/api/users":
name = str(body.get("name", "")).strip()
if not USER_RE.match(name):
self.send_error_json(400, "invalid user name")
return
users = read_telemt_users()
if name in users:
self.send_error_json(409, "user already exists")
return
seed = f"{name}:{time.time()}:{secrets.token_hex(32)}".encode()
secret = hashlib.sha256(seed).hexdigest()[:32]
users[name] = secret
try:
write_telemt_users(users)
except Exception as exc:
self.send_error_json(500, f"failed to save config: {exc}")
return
restarted = restart_service("telemt")
self.send_json({"ok": True, "data": user_payload(name, secret), "restarted": restarted})
elif path == "/api/backups":
ok, result = create_backup()
self.send_json({"ok": ok, "data": {"path": result, "backups": list_backups()}}, 200 if ok else 500)
elif path == "/api/stats/collect":
ok, message, payload = run_stats_action("collect")
payload["message"] = message
self.send_json({"ok": ok, "data": payload}, 200 if ok else 500)
elif path == "/api/stats/repair":
ok, message, payload = run_stats_action("repair")
payload["message"] = message
self.send_json({"ok": ok, "data": payload}, 200 if ok else 500)
elif path.startswith("/api/services/") and path.endswith("/restart"):
service = path[len("/api/services/"):-len("/restart")]
allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats"}
if service not in allowed:
self.send_error_json(400, "unsupported service")
return
ok = restart_service(service)
self.send_json({"ok": ok, "status": service_status(service)}, 200 if ok else 500)
else:
self.send_error_json(404, "not found")
def route_delete_api(self, parsed: urllib.parse.ParseResult) -> None:
if not self.require_write_guard():
return
path = parsed.path
if not path.startswith("/api/users/"):
self.send_error_json(404, "not found")
return
name = urllib.parse.unquote(path[len("/api/users/"):])
if name == "main":
self.send_error_json(400, "main user cannot be deleted")
return
users = read_telemt_users()
if name not in users:
self.send_error_json(404, "user not found")
return
users.pop(name, None)
try:
write_telemt_users(users)
except Exception as exc:
self.send_error_json(500, f"failed to save config: {exc}")
return
restarted = restart_service("telemt")
self.send_json({"ok": True, "restarted": restarted})
def send_static(self, parsed: urllib.parse.ParseResult) -> None:
rel = parsed.path.lstrip("/") or "index.html"
if rel.startswith("api/") or ".." in rel.split("/"):
self.send_error(404)
return
path = STATIC_DIR / rel
if path.is_dir():
path = path / "index.html"
if not path.exists():
path = STATIC_DIR / "index.html"
try:
body = path.read_bytes()
except OSError:
self.send_error(404)
return
mime = mimetypes.guess_type(str(path))[0] or "application/octet-stream"
self.send_response(200)
self.send_header("Content-Type", mime)
self.send_header("Cache-Control", "no-store" if path.name == "index.html" else "public, max-age=3600")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path.startswith("/api/"):
self.route_get_api(parsed)
else:
self.send_static(parsed)
def do_POST(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path.startswith("/api/"):
self.route_post_api(parsed)
else:
self.send_error(404)
def do_DELETE(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path.startswith("/api/"):
self.route_delete_api(parsed)
else:
self.send_error(404)
def main() -> None:
if not STATIC_DIR.exists():
raise SystemExit(f"static dir not found: {STATIC_DIR}")
httpd = ThreadingHTTPServer((HOST, PORT), AdminHandler)
print(f"GoTelegram admin listening on http://{HOST}:{PORT}")
httpd.serve_forever()
if __name__ == "__main__":
main()