v2.5.0: remove local web admin token gate

This commit is contained in:
Виталий Литвинов
2026-04-24 22:00:08 +03:00
parent 8804319e19
commit 008143a617
10 changed files with 15 additions and 136 deletions

View File

@@ -10,7 +10,6 @@ from __future__ import annotations
import csv
import hashlib
import http.cookies
import json
import mimetypes
import os
@@ -30,7 +29,6 @@ from typing import Any
ADMIN_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_DIR", "/opt/gotelegram-admin"))
STATIC_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_STATIC", str(ADMIN_DIR / "static")))
TOKEN_FILE = Path(os.getenv("GOTELEGRAM_ADMIN_TOKEN_FILE", str(ADMIN_DIR / "token")))
GOTELEGRAM_CONFIG = Path(os.getenv("GOTELEGRAM_CONFIG", "/opt/gotelegram/config.json"))
TELEMT_CONFIG = Path(os.getenv("TELEMT_CONFIG", "/etc/telemt/config.toml"))
@@ -63,21 +61,6 @@ def run(cmd: list[str], timeout: int = 8) -> tuple[int, str, str]:
return 125, "", str(exc)
def ensure_token() -> str:
ADMIN_DIR.mkdir(parents=True, exist_ok=True)
if not TOKEN_FILE.exists() or TOKEN_FILE.stat().st_size < 24:
TOKEN_FILE.write_text(secrets.token_urlsafe(36) + "\n", encoding="utf-8")
os.chmod(TOKEN_FILE, 0o600)
return TOKEN_FILE.read_text(encoding="utf-8").strip()
def current_token() -> str:
try:
return ensure_token()
except OSError:
return ""
def load_json(path: Path, fallback: Any = None) -> Any:
try:
with path.open("r", encoding="utf-8") as fh:
@@ -353,27 +336,6 @@ class AdminHandler(BaseHTTPRequestHandler):
def log_message(self, fmt: str, *args: Any) -> None:
print("%s - %s" % (self.address_string(), fmt % args))
def token_from_request(self) -> str:
parsed = urllib.parse.urlparse(self.path)
qs = urllib.parse.parse_qs(parsed.query)
if qs.get("token", [""])[0]:
return qs["token"][0]
auth = self.headers.get("Authorization", "")
if auth.startswith("Bearer "):
return auth[len("Bearer "):].strip()
cookie_header = self.headers.get("Cookie", "")
if cookie_header:
cookie = http.cookies.SimpleCookie()
cookie.load(cookie_header)
if "gtauth" in cookie:
return cookie["gtauth"].value
return ""
def is_authorized(self) -> bool:
token = current_token()
candidate = self.token_from_request()
return bool(token and candidate and secrets.compare_digest(token, candidate))
def send_json(self, payload: Any, status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
@@ -394,12 +356,6 @@ class AdminHandler(BaseHTTPRequestHandler):
return {}
return json.loads(self.rfile.read(length).decode("utf-8"))
def require_auth(self) -> bool:
if self.is_authorized():
return True
self.send_error_json(401, "unauthorized")
return False
def require_write_guard(self) -> bool:
if self.command in {"POST", "PUT", "PATCH", "DELETE"} and self.headers.get("X-GoTelegram-Admin") != "1":
self.send_error_json(403, "missing write guard")
@@ -407,8 +363,6 @@ class AdminHandler(BaseHTTPRequestHandler):
return True
def route_get_api(self, parsed: urllib.parse.ParseResult) -> None:
if not self.require_auth():
return
path = parsed.path
if path == "/api/overview":
self.send_json({"ok": True, "data": overview_payload()})
@@ -437,7 +391,7 @@ class AdminHandler(BaseHTTPRequestHandler):
self.send_error_json(404, "not found")
def route_post_api(self, parsed: urllib.parse.ParseResult) -> None:
if not self.require_auth() or not self.require_write_guard():
if not self.require_write_guard():
return
path = parsed.path
try:
@@ -480,7 +434,7 @@ class AdminHandler(BaseHTTPRequestHandler):
self.send_error_json(404, "not found")
def route_delete_api(self, parsed: urllib.parse.ParseResult) -> None:
if not self.require_auth() or not self.require_write_guard():
if not self.require_write_guard():
return
path = parsed.path
if not path.startswith("/api/users/"):
@@ -504,15 +458,6 @@ class AdminHandler(BaseHTTPRequestHandler):
self.send_json({"ok": True, "restarted": restarted})
def send_static(self, parsed: urllib.parse.ParseResult) -> None:
qs = urllib.parse.parse_qs(parsed.query)
if qs.get("token", [""])[0] and self.is_authorized():
self.send_response(302)
self.send_header("Location", "/")
self.send_header("Set-Cookie", "gtauth=%s; Path=/; HttpOnly; SameSite=Strict" % qs["token"][0])
self.send_header("Cache-Control", "no-store")
self.end_headers()
return
rel = parsed.path.lstrip("/") or "index.html"
if rel.startswith("api/") or ".." in rel.split("/"):
self.send_error(404)
@@ -558,7 +503,6 @@ class AdminHandler(BaseHTTPRequestHandler):
def main() -> None:
ensure_token()
if not STATIC_DIR.exists():
raise SystemExit(f"static dir not found: {STATIC_DIR}")
httpd = ThreadingHTTPServer((HOST, PORT), AdminHandler)