v2.5.0: add QR import and backup scheduling

This commit is contained in:
Виталий Литвинов
2026-04-25 14:39:56 +03:00
parent c7540a97f7
commit b2ab0dca57
10 changed files with 854 additions and 97 deletions

View File

@@ -16,6 +16,7 @@ import mimetypes
import os
import re
import secrets
import shlex
import socket
import subprocess
import time
@@ -42,6 +43,8 @@ BOT_DIR = Path(os.getenv("GOTELEGRAM_BOT_DIR", "/opt/gotelegram-bot"))
DISABLED_USERS_FILE = Path(os.getenv("GOTELEGRAM_DISABLED_USERS", "/opt/gotelegram/disabled_users.json"))
USER_LOCK_FILE = Path(os.getenv("GOTELEGRAM_USER_LOCK", "/run/gotelegram/admin-users.lock"))
SHARED_443_CONFIG = Path(os.getenv("GOTELEGRAM_SHARED_443", "/opt/gotelegram/shared-443.json"))
BACKUP_SCHEDULE_FILE = Path(os.getenv("GOTELEGRAM_BACKUP_SCHEDULE", "/opt/gotelegram/backup_schedule.json"))
BACKUP_RESTORE_LOG = Path(os.getenv("GOTELEGRAM_BACKUP_RESTORE_LOG", "/var/log/gotelegram-restore.log"))
HOST = os.getenv("GOTELEGRAM_ADMIN_HOST", "127.0.0.1")
PORT = int(os.getenv("GOTELEGRAM_ADMIN_PORT", "1984"))
@@ -49,6 +52,7 @@ VERSION = "2.5.0"
USER_RE = re.compile(r"^[A-Za-z0-9_.-]{1,48}$")
LANG_RE = re.compile(r"^(en|ru)$")
SENSITIVE_CONFIG_KEYS = {"secret"}
BACKUP_NAME_RE = re.compile(r"^[A-Za-z0-9_.-]+\.tar\.gz(\.enc)?$")
TRAFFIC_WINDOWS = {
"15m": 15 * 60,
"1h": 60 * 60,
@@ -75,6 +79,19 @@ def run(cmd: list[str], timeout: int = 8) -> tuple[int, str, str]:
return 125, "", str(exc)
def run_bytes(cmd: list[str], timeout: int = 8) -> tuple[int, bytes, str]:
try:
proc = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
check=False,
)
return proc.returncode, proc.stdout, proc.stderr.decode("utf-8", errors="replace")
except Exception as exc: # pragma: no cover - system dependent
return 125, b"", str(exc)
class FileLock:
def __init__(self, path: Path):
self.path = path
@@ -909,6 +926,55 @@ def list_backups() -> list[dict[str, Any]]:
return items[:30]
def backup_schedule_calendar(frequency: str) -> str | None:
calendars = {
"off": None,
"daily": "*-*-* 03:20:00",
"weekly": "Sun 03:20:00",
"monthly": "*-*-01 03:20:00",
}
if frequency not in calendars:
raise ValueError("unsupported backup schedule")
return calendars[frequency]
def backup_schedule_status() -> dict[str, Any]:
raw = load_json(BACKUP_SCHEDULE_FILE, {}) or {}
if not isinstance(raw, dict):
raw = {}
frequency = str(raw.get("frequency") or "off")
try:
calendar = backup_schedule_calendar(frequency)
except ValueError:
frequency = "off"
calendar = None
active_code, active, _ = run(["systemctl", "is-active", "gotelegram-backup.timer"], timeout=5)
enabled_code, enabled, _ = run(["systemctl", "is-enabled", "gotelegram-backup.timer"], timeout=5)
_, next_run, _ = run(["systemctl", "show", "gotelegram-backup.timer", "--property=NextElapseUSecRealtime", "--value"], timeout=5)
return {
"frequency": frequency,
"calendar": calendar,
"enabled": enabled_code == 0 and enabled.strip() == "enabled",
"active": active_code == 0 and active.strip() == "active",
"next": next_run.strip(),
"updated_at": raw.get("updated_at") or "",
}
def set_backup_schedule(frequency: str) -> tuple[bool, str, dict[str, Any]]:
backup_schedule_calendar(frequency)
script = (
"source /opt/gotelegram/lib/common.sh; "
"source /opt/gotelegram/lib/i18n.sh; "
"source /opt/gotelegram/lib/backup.sh; "
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
f"set_backup_schedule {shlex.quote(frequency)}"
)
code, stdout, stderr = run(["bash", "-lc", script], timeout=120)
message = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0]
return code == 0, message, backup_schedule_status()
def create_backup() -> tuple[bool, str]:
script = (
"source /opt/gotelegram/lib/common.sh; "
@@ -917,13 +983,71 @@ def create_backup() -> tuple[bool, str]:
"source /opt/gotelegram/lib/website.sh; "
"source /opt/gotelegram/lib/backup.sh; "
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
"create_backup \"\""
"create_backup \"\"; "
"cleanup_old_backups 30"
)
code, stdout, stderr = run(["bash", "-lc", script], timeout=180)
text = (stdout.strip().splitlines()[-1:] or stderr.strip().splitlines()[-1:] or [""])[0]
return code == 0, text
def safe_backup_path(name: str) -> Path:
raw = str(name or "").strip()
if not raw or raw != os.path.basename(raw) or not BACKUP_NAME_RE.match(raw) or raw.endswith(".sha256"):
raise ValueError("invalid backup name")
candidate = (BACKUP_DIR / raw).resolve()
base = BACKUP_DIR.resolve()
if base != candidate.parent:
raise ValueError("invalid backup path")
if not candidate.exists():
raise FileNotFoundError("backup not found")
return candidate
def launch_restore_backup(name: str, password: str = "") -> dict[str, Any]:
backup_path = safe_backup_path(name)
if backup_path.name.endswith(".enc") and not password:
raise ValueError("password required for encrypted backup")
BACKUP_RESTORE_LOG.parent.mkdir(parents=True, exist_ok=True)
quoted_path = shlex.quote(str(backup_path))
quoted_password = shlex.quote(password)
quoted_log = shlex.quote(str(BACKUP_RESTORE_LOG))
script = (
"sleep 1; "
"source /opt/gotelegram/lib/common.sh; "
"source /opt/gotelegram/lib/i18n.sh; "
"source /opt/gotelegram/lib/telemt.sh; "
"source /opt/gotelegram/lib/website.sh; "
"source /opt/gotelegram/lib/backup.sh; "
"load_language \"$(detect_language 2>/dev/null || echo en)\"; "
"create_backup \"\" >/dev/null 2>&1 || true; "
f"restore_backup {quoted_path} {quoted_password} yes; "
"cleanup_old_backups 30"
)
with BACKUP_RESTORE_LOG.open("ab") as log:
log.write(f"\n[{utc_now()}] restore requested for {backup_path.name}\n".encode("utf-8"))
subprocess.Popen(
["bash", "-lc", f"{script} >> {quoted_log} 2>&1"],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
return {"name": backup_path.name, "started": True, "log": str(BACKUP_RESTORE_LOG)}
def user_qr_png(name: str) -> tuple[bytes, str]:
users = read_user_records()
record = users.get(name)
if not record:
raise FileNotFoundError("user not found")
link = proxy_link(str(record.get("secret", "")))
code, image, error = run_bytes(["qrencode", "-t", "PNG", "-s", "8", "-m", "2", "-o", "-", link], timeout=8)
if code != 0 or not image:
raise RuntimeError(error.strip() or "qrencode is not installed")
return image, link
def read_log_payload(service: str) -> dict[str, Any]:
allowed = {"telemt", "nginx", "gotelegram-bot", "gotelegram-stats", "gotelegram-admin"}
if service not in allowed:
@@ -999,6 +1123,7 @@ def overview_payload() -> dict[str, Any]:
"stats_status": stats_status(current, history),
"runtime_summary": summary,
"backups": list_backups(),
"backup_schedule": backup_schedule_status(),
}
@@ -1017,6 +1142,14 @@ class AdminHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(body)
def send_bytes(self, body: bytes, content_type: str, status: int = 200) -> None:
self.send_response(status)
self.send_header("Content-Type", content_type)
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def send_error_json(self, status: int, message: str) -> None:
self.send_json({"ok": False, "error": message}, status)
@@ -1046,6 +1179,23 @@ class AdminHandler(BaseHTTPRequestHandler):
record = users[name]
items.append(user_payload(name, record["secret"], record["enabled"], traffic_snapshot=latest.get(name)))
self.send_json({"ok": True, "data": items})
elif path.startswith("/api/users/") and path.endswith("/qr"):
name = urllib.parse.unquote(path[len("/api/users/"):-len("/qr")])
try:
png, link = user_qr_png(name)
except FileNotFoundError:
self.send_error_json(404, "user not found")
return
except Exception as exc:
self.send_error_json(503, str(exc))
return
self.send_response(200)
self.send_header("Content-Type", "image/png")
self.send_header("Cache-Control", "no-store")
self.send_header("X-Proxy-Link", urllib.parse.quote(link, safe=""))
self.send_header("Content-Length", str(len(png)))
self.end_headers()
self.wfile.write(png)
elif path.startswith("/api/users/") and path.endswith("/traffic"):
name = urllib.parse.unquote(path[len("/api/users/"):-len("/traffic")])
users = read_user_records()
@@ -1084,6 +1234,8 @@ class AdminHandler(BaseHTTPRequestHandler):
self.send_json({"ok": True, "data": user_payload(name, record["secret"], record["enabled"], include_runtime=True, traffic_snapshot=latest_user_stats().get(name))})
elif path == "/api/backups":
self.send_json({"ok": True, "data": list_backups()})
elif path == "/api/backups/schedule":
self.send_json({"ok": True, "data": backup_schedule_status()})
elif path == "/api/stats":
qs = urllib.parse.parse_qs(parsed.query)
range_key = normalize_range(qs.get("range", ["1h"])[0])
@@ -1179,6 +1331,27 @@ class AdminHandler(BaseHTTPRequestHandler):
elif path == "/api/backups":
ok, result = create_backup()
self.send_json({"ok": ok, "data": {"path": result, "backups": list_backups()}}, 200 if ok else 500)
elif path == "/api/backups/schedule":
try:
frequency = str(body.get("frequency") or "off").strip().lower()
ok, message, status = set_backup_schedule(frequency)
except ValueError as exc:
self.send_error_json(400, str(exc))
return
self.send_json({"ok": ok, "data": {"message": message, "schedule": status}}, 200 if ok else 500)
elif path == "/api/backups/restore":
try:
payload = launch_restore_backup(str(body.get("name") or ""), str(body.get("password") or ""))
except FileNotFoundError:
self.send_error_json(404, "backup not found")
return
except ValueError as exc:
self.send_error_json(400, str(exc))
return
except Exception as exc:
self.send_error_json(500, str(exc))
return
self.send_json({"ok": True, "data": payload}, 202)
elif path == "/api/stats/collect":
ok, message, payload = run_stats_action("collect")
payload["message"] = message