mirror of
https://github.com/anten-ka/gotelegram_pro.git
synced 2026-05-19 14:26:02 +00:00
706 lines
25 KiB
Python
706 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
GoTelegram local web admin.
|
|
|
|
The service is intentionally bound to 127.0.0.1:1984. Operators reach it
|
|
through an SSH tunnel; it must never be exposed directly on the public network.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import hashlib
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import secrets
|
|
import socket
|
|
import subprocess
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ADMIN_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_DIR", "/opt/gotelegram-admin"))
|
|
STATIC_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_STATIC", str(ADMIN_DIR / "static")))
|
|
|
|
GOTELEGRAM_CONFIG = Path(os.getenv("GOTELEGRAM_CONFIG", "/opt/gotelegram/config.json"))
|
|
TELEMT_CONFIG = Path(os.getenv("TELEMT_CONFIG", "/etc/telemt/config.toml"))
|
|
HISTORY_FILE = Path(os.getenv("GOTELEGRAM_STATS_HISTORY", "/opt/gotelegram/stats_history.csv"))
|
|
CURRENT_STATS = Path(os.getenv("GOTELEGRAM_STATS_CURRENT", "/run/gotelegram/stats_current.json"))
|
|
BACKUP_DIR = Path(os.getenv("GOTELEGRAM_BACKUP_DIR", "/opt/gotelegram/backups"))
|
|
INSTALL_DIR = Path(os.getenv("GOTELEGRAM_DIR", "/opt/gotelegram"))
|
|
BOT_DIR = Path(os.getenv("GOTELEGRAM_BOT_DIR", "/opt/gotelegram-bot"))
|
|
|
|
HOST = os.getenv("GOTELEGRAM_ADMIN_HOST", "127.0.0.1")
|
|
PORT = int(os.getenv("GOTELEGRAM_ADMIN_PORT", "1984"))
|
|
VERSION = "2.5.0"
|
|
USER_RE = re.compile(r"^[A-Za-z0-9_.-]{1,48}$")
|
|
LANG_RE = re.compile(r"^(en|ru)$")
|
|
|
|
|
|
def utc_now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def run(cmd: list[str], timeout: int = 8) -> tuple[int, str, str]:
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
check=False,
|
|
)
|
|
return proc.returncode, proc.stdout, proc.stderr
|
|
except Exception as exc: # pragma: no cover - system dependent
|
|
return 125, "", str(exc)
|
|
|
|
|
|
def load_json(path: Path, fallback: Any = None) -> Any:
|
|
try:
|
|
with path.open("r", encoding="utf-8") as fh:
|
|
return json.load(fh)
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def save_json(path: Path, data: Any, mode: int = 0o600) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=4) + "\n", encoding="utf-8")
|
|
os.chmod(tmp, mode)
|
|
tmp.replace(path)
|
|
|
|
|
|
def read_language(config: dict[str, Any] | None = None) -> str:
|
|
config = config or load_json(GOTELEGRAM_CONFIG, {}) or {}
|
|
lang = str(config.get("language") or config.get("lang") or "").strip().lower()
|
|
marker = INSTALL_DIR / ".language"
|
|
if lang not in {"en", "ru"} and marker.exists():
|
|
try:
|
|
lang = marker.read_text(encoding="utf-8", errors="ignore").strip().lower()[:2]
|
|
except OSError:
|
|
lang = ""
|
|
return lang if lang in {"en", "ru"} else "en"
|
|
|
|
|
|
def write_language(lang: str) -> dict[str, Any]:
|
|
lang = str(lang or "").strip().lower()
|
|
if not LANG_RE.match(lang):
|
|
raise ValueError("unsupported language")
|
|
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
|
|
if not isinstance(config, dict):
|
|
config = {}
|
|
config["language"] = lang
|
|
config["updated_at"] = utc_now()
|
|
save_json(GOTELEGRAM_CONFIG, config)
|
|
INSTALL_DIR.mkdir(parents=True, exist_ok=True)
|
|
(INSTALL_DIR / ".language").write_text(lang + "\n", encoding="utf-8")
|
|
bot_env = BOT_DIR / ".env"
|
|
if bot_env.exists():
|
|
lines = bot_env.read_text(encoding="utf-8", errors="ignore").splitlines()
|
|
found = False
|
|
out = []
|
|
for line in lines:
|
|
if line.startswith("BOT_LANG="):
|
|
out.append(f"BOT_LANG={lang}")
|
|
found = True
|
|
else:
|
|
out.append(line)
|
|
if not found:
|
|
out.append(f"BOT_LANG={lang}")
|
|
bot_env.write_text("\n".join(out).rstrip() + "\n", encoding="utf-8")
|
|
os.chmod(bot_env, 0o600)
|
|
return {"language": lang}
|
|
|
|
|
|
def read_telemt_users() -> dict[str, str]:
|
|
if not TELEMT_CONFIG.exists():
|
|
return {}
|
|
users: dict[str, str] = {}
|
|
in_users = False
|
|
for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
line = raw.strip()
|
|
if line == "[access.users]":
|
|
in_users = True
|
|
continue
|
|
if in_users and line.startswith("["):
|
|
break
|
|
if not in_users or not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
name, value = line.split("=", 1)
|
|
name = name.strip()
|
|
value = value.strip().split("#", 1)[0].strip()
|
|
if value.startswith('"') and '"' in value[1:]:
|
|
value = value[1:].split('"', 1)[0]
|
|
elif value.startswith("'") and "'" in value[1:]:
|
|
value = value[1:].split("'", 1)[0]
|
|
if USER_RE.match(name) and value:
|
|
users[name] = value
|
|
return users
|
|
|
|
|
|
def _ordered_user_lines(users: dict[str, str]) -> list[str]:
|
|
names = []
|
|
if "main" in users:
|
|
names.append("main")
|
|
names.extend(sorted(n for n in users if n != "main"))
|
|
return [f'{name} = "{users[name]}"' for name in names]
|
|
|
|
|
|
def write_telemt_users(users: dict[str, str]) -> None:
|
|
TELEMT_CONFIG.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines() if TELEMT_CONFIG.exists() else []
|
|
rendered = _ordered_user_lines(users)
|
|
out: list[str] = []
|
|
in_users = False
|
|
found = False
|
|
|
|
for raw in lines:
|
|
if raw.strip() == "[access.users]":
|
|
found = True
|
|
in_users = True
|
|
out.append(raw)
|
|
out.extend(rendered)
|
|
continue
|
|
if in_users and raw.strip().startswith("["):
|
|
in_users = False
|
|
if in_users:
|
|
continue
|
|
out.append(raw)
|
|
|
|
if not found:
|
|
if out and out[-1].strip():
|
|
out.append("")
|
|
out.append("[access.users]")
|
|
out.extend(rendered)
|
|
|
|
TELEMT_CONFIG.write_text("\n".join(out).rstrip() + "\n", encoding="utf-8")
|
|
os.chmod(TELEMT_CONFIG, 0o600)
|
|
|
|
|
|
def restart_service(name: str) -> bool:
|
|
code, _, _ = run(["systemctl", "restart", name], timeout=25)
|
|
if code != 0:
|
|
return False
|
|
if name == "telemt":
|
|
return wait_tcp_port(read_telemt_port(), timeout=90)
|
|
return True
|
|
|
|
|
|
def service_status(name: str) -> str:
|
|
code, stdout, _ = run(["systemctl", "is-active", name], timeout=3)
|
|
value = stdout.strip()
|
|
if code == 0 and value == "active":
|
|
return "running"
|
|
code, stdout, _ = run(["systemctl", "list-unit-files", f"{name}.service", "--no-legend"], timeout=3)
|
|
if code != 0 or not stdout.strip():
|
|
return "not_installed"
|
|
if value in {"failed", "inactive", "activating", "deactivating"}:
|
|
return value
|
|
return "stopped"
|
|
|
|
|
|
def read_telemt_port() -> int:
|
|
if not TELEMT_CONFIG.exists():
|
|
return 443
|
|
in_server = False
|
|
for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
line = raw.strip()
|
|
if line == "[server]":
|
|
in_server = True
|
|
continue
|
|
if in_server and line.startswith("["):
|
|
break
|
|
if in_server and line.startswith("port") and "=" in line:
|
|
try:
|
|
return int(line.split("=", 1)[1].strip().split("#", 1)[0])
|
|
except ValueError:
|
|
return 443
|
|
return 443
|
|
|
|
|
|
def wait_tcp_port(port: int, timeout: int = 90) -> bool:
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
if service_status("telemt") not in {"running", "activating"}:
|
|
return False
|
|
try:
|
|
with socket.create_connection(("127.0.0.1", port), timeout=0.6):
|
|
return True
|
|
except OSError:
|
|
time.sleep(1)
|
|
return False
|
|
|
|
|
|
def public_ip() -> str:
|
|
code, stdout, _ = run(["curl", "-s", "-4", "--max-time", "3", "https://api.ipify.org"], timeout=5)
|
|
ip = stdout.strip()
|
|
if code == 0 and re.match(r"^\d{1,3}(\.\d{1,3}){3}$", ip):
|
|
return ip
|
|
code, stdout, _ = run(["hostname", "-I"], timeout=3)
|
|
return stdout.split()[0] if code == 0 and stdout.split() else "0.0.0.0"
|
|
|
|
|
|
def proxy_link(secret: str) -> str:
|
|
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
|
|
mode = str(config.get("mode", "lite"))
|
|
port = int(config.get("port", 443) or 443)
|
|
domain = str(config.get("domain", "") or "")
|
|
mask_host = str(config.get("mask_host", "") or "")
|
|
|
|
if mode == "pro" and domain:
|
|
host_hex = domain.encode().hex()
|
|
return f"tg://proxy?server={domain}&port={port}&secret=ee{secret}{host_hex}"
|
|
|
|
server = public_ip()
|
|
if mask_host:
|
|
host_hex = mask_host.encode().hex()
|
|
return f"tg://proxy?server={server}&port={port}&secret=ee{secret}{host_hex}"
|
|
return f"tg://proxy?server={server}&port={port}&secret={secret}"
|
|
|
|
|
|
def telemt_api(path: str) -> Any:
|
|
try:
|
|
with urllib.request.urlopen(f"http://127.0.0.1:9091{path}", timeout=1.8) as resp:
|
|
payload = resp.read(256 * 1024)
|
|
return json.loads(payload.decode("utf-8"))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def site_status(config: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
config = config or load_json(GOTELEGRAM_CONFIG, {}) or {}
|
|
host = str(config.get("domain") or "").strip()
|
|
if not host:
|
|
return {"host": "", "url": "", "http_code": 0, "ok": False, "checked": False, "error": "domain_missing"}
|
|
if not re.match(r"^[A-Za-z0-9.-]{1,253}$", host) or ".." in host or host.startswith(".") or host.endswith("."):
|
|
return {"host": host, "url": "", "http_code": 0, "ok": False, "checked": False, "error": "invalid_domain"}
|
|
url = f"https://{host}/"
|
|
code, stdout, stderr = run(["curl", "-k", "-L", "-sS", "-o", "/dev/null", "-w", "%{http_code}", "--max-time", "8", url], timeout=10)
|
|
raw_code = stdout.strip()
|
|
try:
|
|
http_code = int(raw_code)
|
|
except ValueError:
|
|
http_code = 0
|
|
return {
|
|
"host": host,
|
|
"url": url,
|
|
"http_code": http_code,
|
|
"ok": code == 0 and http_code == 200,
|
|
"checked": True,
|
|
"error": "" if code == 0 else (stderr.strip() or f"curl exit {code}"),
|
|
"checked_at": int(time.time()),
|
|
}
|
|
|
|
|
|
def load_stats_history(limit: int = 240) -> list[dict[str, int]]:
|
|
if not HISTORY_FILE.exists():
|
|
return []
|
|
rows: list[dict[str, int]] = []
|
|
try:
|
|
with HISTORY_FILE.open("r", encoding="utf-8", newline="") as fh:
|
|
for row in csv.DictReader(fh):
|
|
try:
|
|
rows.append({
|
|
"epoch": int(row.get("epoch") or 0),
|
|
"proxy_bytes": int(row.get("proxy_bytes") or 0),
|
|
"site_bytes": int(row.get("site_bytes") or 0),
|
|
})
|
|
except ValueError:
|
|
continue
|
|
except OSError:
|
|
return []
|
|
rows = rows[-limit:]
|
|
previous = None
|
|
enriched: list[dict[str, int]] = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
if previous:
|
|
item["proxy_delta"] = max(0, row["proxy_bytes"] - previous["proxy_bytes"])
|
|
item["site_delta"] = max(0, row["site_bytes"] - previous["site_bytes"])
|
|
else:
|
|
item["proxy_delta"] = 0
|
|
item["site_delta"] = 0
|
|
enriched.append(item)
|
|
previous = row
|
|
return enriched
|
|
|
|
|
|
def count_history_rows() -> int:
|
|
if not HISTORY_FILE.exists():
|
|
return 0
|
|
try:
|
|
with HISTORY_FILE.open("r", encoding="utf-8", errors="ignore") as fh:
|
|
return sum(1 for line in fh if line and line[0].isdigit())
|
|
except OSError:
|
|
return 0
|
|
|
|
|
|
def stats_status(current: dict[str, Any] | None = None, history: list[dict[str, int]] | None = None) -> dict[str, Any]:
|
|
current = current if current is not None else (load_json(CURRENT_STATS, {}) or {})
|
|
history = history if history is not None else load_stats_history(limit=2)
|
|
service = service_status("gotelegram-stats")
|
|
now = int(time.time())
|
|
ts = int(current.get("ts") or 0) if isinstance(current, dict) else 0
|
|
age = max(0, now - ts) if ts else None
|
|
error = str(current.get("error") or "") if isinstance(current, dict) else ""
|
|
history_rows = count_history_rows()
|
|
if error:
|
|
health = "error"
|
|
elif service == "running" and current and age is not None and age <= 180:
|
|
health = "ok"
|
|
elif service == "running":
|
|
health = "stale"
|
|
elif service == "not_installed":
|
|
health = "not_installed"
|
|
else:
|
|
health = "stopped"
|
|
return {
|
|
"health": health,
|
|
"service": service,
|
|
"current_exists": CURRENT_STATS.exists(),
|
|
"history_exists": HISTORY_FILE.exists(),
|
|
"history_rows": history_rows,
|
|
"history_points": len(history or []),
|
|
"last_ts": ts,
|
|
"age_seconds": age,
|
|
"error": error,
|
|
}
|
|
|
|
|
|
def run_stats_action(action: str) -> tuple[bool, str, dict[str, Any]]:
|
|
if action == "repair":
|
|
body = (
|
|
"source /opt/gotelegram/lib/common.sh; "
|
|
"source /opt/gotelegram/lib/i18n.sh; "
|
|
"source /opt/gotelegram/lib/stats.sh; "
|
|
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
|
|
"install_stats_collector; "
|
|
"stats_collect"
|
|
)
|
|
timeout = 180
|
|
else:
|
|
body = (
|
|
"source /opt/gotelegram/lib/common.sh; "
|
|
"source /opt/gotelegram/lib/stats.sh; "
|
|
"stats_init >/dev/null 2>&1 || true; "
|
|
"stats_collect"
|
|
)
|
|
timeout = 30
|
|
code, stdout, stderr = run(["bash", "-lc", body], timeout=timeout)
|
|
message = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0]
|
|
current = load_json(CURRENT_STATS, {}) or {}
|
|
history = load_stats_history()
|
|
return code == 0, message, {"current": current, "history": history, "status": stats_status(current, history)}
|
|
|
|
|
|
def list_backups() -> list[dict[str, Any]]:
|
|
if not BACKUP_DIR.exists():
|
|
return []
|
|
items = []
|
|
for path in sorted(BACKUP_DIR.glob("*.tar.gz*"), key=lambda p: p.stat().st_mtime, reverse=True):
|
|
if path.name.endswith(".sha256"):
|
|
continue
|
|
try:
|
|
st = path.stat()
|
|
except OSError:
|
|
continue
|
|
items.append({
|
|
"name": path.name,
|
|
"path": str(path),
|
|
"size": st.st_size,
|
|
"mtime": int(st.st_mtime),
|
|
"encrypted": path.name.endswith(".enc"),
|
|
})
|
|
return items[:30]
|
|
|
|
|
|
def create_backup() -> tuple[bool, str]:
|
|
script = (
|
|
"source /opt/gotelegram/lib/common.sh; "
|
|
"source /opt/gotelegram/lib/i18n.sh; "
|
|
"source /opt/gotelegram/lib/telemt.sh; "
|
|
"source /opt/gotelegram/lib/website.sh; "
|
|
"source /opt/gotelegram/lib/backup.sh; "
|
|
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
|
|
"create_backup \"\""
|
|
)
|
|
code, stdout, stderr = run(["bash", "-lc", script], timeout=180)
|
|
text = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0]
|
|
return code == 0, text
|
|
|
|
|
|
def read_log_payload(service: str) -> dict[str, Any]:
|
|
allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats", "gotelegram-admin"}
|
|
if service not in allowed:
|
|
raise ValueError("unsupported service")
|
|
code, stdout, stderr = run(["journalctl", "-u", service, "-n", "180", "--no-pager", "-o", "short-iso"], timeout=10)
|
|
text = stdout if code == 0 else stderr
|
|
lines = text.splitlines()
|
|
if code == 0 and not lines:
|
|
text = f"No journal entries for {service}."
|
|
lines = [text]
|
|
return {
|
|
"service": service,
|
|
"ok": code == 0,
|
|
"exit_code": code,
|
|
"line_count": len(lines),
|
|
"text": text,
|
|
}
|
|
|
|
|
|
def user_payload(name: str, secret: str, include_runtime: bool = False) -> dict[str, Any]:
|
|
item: dict[str, Any] = {
|
|
"name": name,
|
|
"secret": secret,
|
|
"link": proxy_link(secret),
|
|
"main": name == "main",
|
|
}
|
|
if include_runtime:
|
|
item["runtime"] = telemt_api(f"/v1/users/{urllib.parse.quote(name, safe='')}")
|
|
return item
|
|
|
|
|
|
def overview_payload() -> dict[str, Any]:
|
|
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
|
|
language = read_language(config)
|
|
users = read_telemt_users()
|
|
current = load_json(CURRENT_STATS, {}) or {}
|
|
history = load_stats_history()
|
|
summary = telemt_api("/v1/stats/summary")
|
|
services = {
|
|
"telemt": service_status("telemt"),
|
|
"nginx": service_status("nginx"),
|
|
"bot": service_status("gotelegram-bot"),
|
|
"stats": service_status("gotelegram-stats"),
|
|
"admin": service_status("gotelegram-admin"),
|
|
}
|
|
return {
|
|
"version": VERSION,
|
|
"time": utc_now(),
|
|
"language": language,
|
|
"admin_bind": {"host": HOST, "port": PORT},
|
|
"config": config,
|
|
"site_status": site_status(config),
|
|
"users_count": len(users),
|
|
"services": services,
|
|
"stats_current": current,
|
|
"stats_history": history,
|
|
"stats_status": stats_status(current, history),
|
|
"runtime_summary": summary,
|
|
"backups": list_backups(),
|
|
}
|
|
|
|
|
|
class AdminHandler(BaseHTTPRequestHandler):
|
|
server_version = "GoTelegramAdmin/2.5.0"
|
|
|
|
def log_message(self, fmt: str, *args: Any) -> None:
|
|
print("%s - %s" % (self.address_string(), fmt % args))
|
|
|
|
def send_json(self, payload: Any, status: int = 200) -> None:
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def send_error_json(self, status: int, message: str) -> None:
|
|
self.send_json({"ok": False, "error": message}, status)
|
|
|
|
def read_json_body(self) -> Any:
|
|
length = int(self.headers.get("Content-Length", "0") or 0)
|
|
if length > 1024 * 1024:
|
|
raise ValueError("request body too large")
|
|
if length <= 0:
|
|
return {}
|
|
return json.loads(self.rfile.read(length).decode("utf-8"))
|
|
|
|
def require_write_guard(self) -> bool:
|
|
if self.command in {"POST", "PUT", "PATCH", "DELETE"} and self.headers.get("X-GoTelegram-Admin") != "1":
|
|
self.send_error_json(403, "missing write guard")
|
|
return False
|
|
return True
|
|
|
|
def route_get_api(self, parsed: urllib.parse.ParseResult) -> None:
|
|
path = parsed.path
|
|
if path == "/api/overview":
|
|
self.send_json({"ok": True, "data": overview_payload()})
|
|
elif path == "/api/users":
|
|
users = read_telemt_users()
|
|
self.send_json({"ok": True, "data": [user_payload(k, v) for k, v in sorted(users.items())]})
|
|
elif path.startswith("/api/users/"):
|
|
name = urllib.parse.unquote(path[len("/api/users/"):])
|
|
users = read_telemt_users()
|
|
if name not in users:
|
|
self.send_error_json(404, "user not found")
|
|
return
|
|
self.send_json({"ok": True, "data": user_payload(name, users[name], include_runtime=True)})
|
|
elif path == "/api/backups":
|
|
self.send_json({"ok": True, "data": list_backups()})
|
|
elif path == "/api/stats":
|
|
current = load_json(CURRENT_STATS, {}) or {}
|
|
history = load_stats_history()
|
|
self.send_json({"ok": True, "data": {"current": current, "history": history, "status": stats_status(current, history)}})
|
|
elif path == "/api/site/check":
|
|
self.send_json({"ok": True, "data": site_status()})
|
|
elif path == "/api/logs":
|
|
qs = urllib.parse.parse_qs(parsed.query)
|
|
service = qs.get("service", ["telemt"])[0]
|
|
try:
|
|
payload = read_log_payload(service)
|
|
except ValueError:
|
|
self.send_error_json(400, "unsupported service")
|
|
return
|
|
self.send_json({"ok": True, "data": payload})
|
|
else:
|
|
self.send_error_json(404, "not found")
|
|
|
|
def route_post_api(self, parsed: urllib.parse.ParseResult) -> None:
|
|
if not self.require_write_guard():
|
|
return
|
|
path = parsed.path
|
|
try:
|
|
body = self.read_json_body()
|
|
except Exception as exc:
|
|
self.send_error_json(400, str(exc))
|
|
return
|
|
|
|
if path == "/api/users":
|
|
name = str(body.get("name", "")).strip()
|
|
if not USER_RE.match(name):
|
|
self.send_error_json(400, "invalid user name")
|
|
return
|
|
users = read_telemt_users()
|
|
if name in users:
|
|
self.send_error_json(409, "user already exists")
|
|
return
|
|
seed = f"{name}:{time.time()}:{secrets.token_hex(32)}".encode()
|
|
secret = hashlib.sha256(seed).hexdigest()[:32]
|
|
users[name] = secret
|
|
try:
|
|
write_telemt_users(users)
|
|
except Exception as exc:
|
|
self.send_error_json(500, f"failed to save config: {exc}")
|
|
return
|
|
restarted = restart_service("telemt")
|
|
self.send_json({"ok": True, "data": user_payload(name, secret), "restarted": restarted})
|
|
elif path == "/api/backups":
|
|
ok, result = create_backup()
|
|
self.send_json({"ok": ok, "data": {"path": result, "backups": list_backups()}}, 200 if ok else 500)
|
|
elif path == "/api/stats/collect":
|
|
ok, message, payload = run_stats_action("collect")
|
|
payload["message"] = message
|
|
self.send_json({"ok": ok, "data": payload}, 200 if ok else 500)
|
|
elif path == "/api/stats/repair":
|
|
ok, message, payload = run_stats_action("repair")
|
|
payload["message"] = message
|
|
self.send_json({"ok": ok, "data": payload}, 200 if ok else 500)
|
|
elif path == "/api/settings/language":
|
|
try:
|
|
lang_payload = write_language(str(body.get("language", "")))
|
|
except Exception as exc:
|
|
self.send_error_json(400, str(exc))
|
|
return
|
|
self.send_json({"ok": True, "data": lang_payload})
|
|
elif path.startswith("/api/services/") and path.endswith("/restart"):
|
|
service = path[len("/api/services/"):-len("/restart")]
|
|
allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats"}
|
|
if service not in allowed:
|
|
self.send_error_json(400, "unsupported service")
|
|
return
|
|
ok = restart_service(service)
|
|
self.send_json({"ok": ok, "status": service_status(service)}, 200 if ok else 500)
|
|
else:
|
|
self.send_error_json(404, "not found")
|
|
|
|
def route_delete_api(self, parsed: urllib.parse.ParseResult) -> None:
|
|
if not self.require_write_guard():
|
|
return
|
|
path = parsed.path
|
|
if not path.startswith("/api/users/"):
|
|
self.send_error_json(404, "not found")
|
|
return
|
|
name = urllib.parse.unquote(path[len("/api/users/"):])
|
|
if name == "main":
|
|
self.send_error_json(400, "main user cannot be deleted")
|
|
return
|
|
users = read_telemt_users()
|
|
if name not in users:
|
|
self.send_error_json(404, "user not found")
|
|
return
|
|
users.pop(name, None)
|
|
try:
|
|
write_telemt_users(users)
|
|
except Exception as exc:
|
|
self.send_error_json(500, f"failed to save config: {exc}")
|
|
return
|
|
restarted = restart_service("telemt")
|
|
self.send_json({"ok": True, "restarted": restarted})
|
|
|
|
def send_static(self, parsed: urllib.parse.ParseResult) -> None:
|
|
rel = parsed.path.lstrip("/") or "index.html"
|
|
if rel.startswith("api/") or ".." in rel.split("/"):
|
|
self.send_error(404)
|
|
return
|
|
path = STATIC_DIR / rel
|
|
if path.is_dir():
|
|
path = path / "index.html"
|
|
if not path.exists():
|
|
path = STATIC_DIR / "index.html"
|
|
try:
|
|
body = path.read_bytes()
|
|
except OSError:
|
|
self.send_error(404)
|
|
return
|
|
mime = mimetypes.guess_type(str(path))[0] or "application/octet-stream"
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", mime)
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self) -> None:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path.startswith("/api/"):
|
|
self.route_get_api(parsed)
|
|
else:
|
|
self.send_static(parsed)
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path.startswith("/api/"):
|
|
self.route_post_api(parsed)
|
|
else:
|
|
self.send_error(404)
|
|
|
|
def do_DELETE(self) -> None:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path.startswith("/api/"):
|
|
self.route_delete_api(parsed)
|
|
else:
|
|
self.send_error(404)
|
|
|
|
|
|
def main() -> None:
|
|
if not STATIC_DIR.exists():
|
|
raise SystemExit(f"static dir not found: {STATIC_DIR}")
|
|
httpd = ThreadingHTTPServer((HOST, PORT), AdminHandler)
|
|
print(f"GoTelegram admin listening on http://{HOST}:{PORT}")
|
|
httpd.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|