v2.5.0: refine admin traffic and port status

This commit is contained in:
Виталий Литвинов
2026-04-25 12:01:31 +03:00
parent d8ec62eb07
commit d74b05ccf8
7 changed files with 758 additions and 136 deletions

View File

@@ -9,6 +9,7 @@ through an SSH tunnel; it must never be exposed directly on the public network.
from __future__ import annotations
import csv
import fcntl
import hashlib
import json
import mimetypes
@@ -38,6 +39,7 @@ BACKUP_DIR = Path(os.getenv("GOTELEGRAM_BACKUP_DIR", "/opt/gotelegram/backups"))
INSTALL_DIR = Path(os.getenv("GOTELEGRAM_DIR", "/opt/gotelegram"))
BOT_DIR = Path(os.getenv("GOTELEGRAM_BOT_DIR", "/opt/gotelegram-bot"))
DISABLED_USERS_FILE = Path(os.getenv("GOTELEGRAM_DISABLED_USERS", "/opt/gotelegram/disabled_users.json"))
USER_LOCK_FILE = Path(os.getenv("GOTELEGRAM_USER_LOCK", "/run/gotelegram/admin-users.lock"))
HOST = os.getenv("GOTELEGRAM_ADMIN_HOST", "127.0.0.1")
PORT = int(os.getenv("GOTELEGRAM_ADMIN_PORT", "1984"))
@@ -45,6 +47,12 @@ VERSION = "2.5.0"
USER_RE = re.compile(r"^[A-Za-z0-9_.-]{1,48}$")
LANG_RE = re.compile(r"^(en|ru)$")
SENSITIVE_CONFIG_KEYS = {"secret"}
TRAFFIC_WINDOWS = {
"15m": 15 * 60,
"1h": 60 * 60,
"24h": 24 * 60 * 60,
"month": 30 * 24 * 60 * 60,
}
def utc_now() -> str:
@@ -65,6 +73,23 @@ def run(cmd: list[str], timeout: int = 8) -> tuple[int, str, str]:
return 125, "", str(exc)
class FileLock:
def __init__(self, path: Path):
self.path = path
self.handle: Any = None
def __enter__(self) -> "FileLock":
self.path.parent.mkdir(parents=True, exist_ok=True)
self.handle = self.path.open("w", encoding="utf-8")
fcntl.flock(self.handle.fileno(), fcntl.LOCK_EX)
return self
def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
if self.handle:
fcntl.flock(self.handle.fileno(), fcntl.LOCK_UN)
self.handle.close()
def load_json(path: Path, fallback: Any = None) -> Any:
try:
with path.open("r", encoding="utf-8") as fh:
@@ -226,8 +251,10 @@ def write_telemt_users(users: dict[str, str]) -> None:
out.append("[access.users]")
out.extend(rendered)
TELEMT_CONFIG.write_text("\n".join(out).rstrip() + "\n", encoding="utf-8")
os.chmod(TELEMT_CONFIG, 0o600)
tmp = TELEMT_CONFIG.with_name(TELEMT_CONFIG.name + ".tmp")
tmp.write_text("\n".join(out).rstrip() + "\n", encoding="utf-8")
os.chmod(tmp, 0o600)
tmp.replace(TELEMT_CONFIG)
def restart_service(name: str) -> bool:
@@ -239,6 +266,19 @@ def restart_service(name: str) -> bool:
return True
def request_service_restart(name: str) -> bool:
try:
subprocess.Popen(
["systemctl", "restart", name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
return True
except Exception:
return False
def service_status(name: str) -> str:
code, stdout, _ = run(["systemctl", "is-active", name], timeout=3)
value = stdout.strip()
@@ -271,6 +311,84 @@ def read_telemt_port() -> int:
return 443
def _is_port_addr(value: str, port: int) -> bool:
token = value.strip()
if token.startswith("[") and "]:" in token:
return token.rsplit(":", 1)[-1] == str(port)
return token.rsplit(":", 1)[-1] == str(port) if ":" in token else False
def _process_role(process: str) -> str:
lowered = process.lower()
if "telemt" in lowered or "mtproto" in lowered:
return "mtproxy"
if "nginx" in lowered or "apache" in lowered or "caddy" in lowered:
return "site"
if "xray" in lowered or "x-ui" in lowered or "3x-ui" in lowered or "xui" in lowered:
return "xray"
if "amnezia" in lowered or "awg" in lowered or "wireguard" in lowered or re.search(r"\bwg\b", lowered):
return "amneziawg"
return "other"
def parse_ss_listeners(output: str, proto: str, port: int = 443) -> list[dict[str, Any]]:
listeners: list[dict[str, Any]] = []
seen: set[tuple[str, str, str]] = set()
for line in output.splitlines():
parts = line.split()
address = next((part for part in parts if _is_port_addr(part, port)), "")
if not address:
continue
matches = re.findall(r'\("([^"]+)",pid=(\d+)', line)
if matches:
process_names = []
pids = []
for proc, pid in matches:
if proc not in process_names:
process_names.append(proc)
if pid not in pids:
pids.append(pid)
process = ", ".join(process_names)
pid_text = ", ".join(pids)
else:
process = "unknown"
pid_text = ""
key = (proto, address, process)
if key in seen:
continue
seen.add(key)
listeners.append({
"proto": proto.upper(),
"address": address,
"process": process,
"pid": pid_text,
"role": _process_role(process),
})
return listeners
def port_443_status() -> dict[str, Any]:
listeners: list[dict[str, Any]] = []
errors: list[str] = []
for proto, args in {
"tcp": ["ss", "-H", "-ltnp"],
"udp": ["ss", "-H", "-lunp"],
}.items():
code, stdout, stderr = run(args, timeout=2)
if code == 0:
listeners.extend(parse_ss_listeners(stdout, proto, 443))
elif stderr.strip():
errors.append(stderr.strip())
listeners.sort(key=lambda item: (item["proto"], item["address"], item["process"]))
return {
"checked_at": int(time.time()),
"configured_port": read_telemt_port(),
"listeners": listeners,
"ok": not errors,
"error": "; ".join(errors[:2]),
}
def wait_tcp_port(port: int, timeout: int = 90) -> bool:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
@@ -345,7 +463,7 @@ def site_status(config: dict[str, Any] | None = None) -> dict[str, Any]:
}
def load_stats_history(limit: int = 240) -> list[dict[str, int]]:
def load_stats_history(limit: int | None = 240) -> list[dict[str, int]]:
if not HISTORY_FILE.exists():
return []
rows: list[dict[str, int]] = []
@@ -362,7 +480,8 @@ def load_stats_history(limit: int = 240) -> list[dict[str, int]]:
continue
except OSError:
return []
rows = rows[-limit:]
if limit:
rows = rows[-limit:]
previous = None
enriched: list[dict[str, int]] = []
for row in rows:
@@ -378,6 +497,56 @@ def load_stats_history(limit: int = 240) -> list[dict[str, int]]:
return enriched
def history_limit_for_range(range_key: str) -> int:
return {
"15m": 180,
"1h": 240,
"24h": 1800,
"month": 50000,
}.get(range_key, 240)
def normalize_range(range_key: str) -> str:
return range_key if range_key in TRAFFIC_WINDOWS else "1h"
def filter_history_by_range(rows: list[dict[str, int]], range_key: str) -> list[dict[str, int]]:
if not rows:
return []
seconds = TRAFFIC_WINDOWS[normalize_range(range_key)]
latest = max(row.get("epoch", 0) for row in rows)
cutoff = latest - seconds
return [row for row in rows if row.get("epoch", 0) >= cutoff]
def traffic_interval_summaries(rows: list[dict[str, int]]) -> list[dict[str, Any]]:
if not rows:
return [
{"range": key, "points": 0, "from": 0, "to": 0, "proxy_delta": 0, "site_delta": 0, "proxy_total": 0, "site_total": 0}
for key in TRAFFIC_WINDOWS
]
latest = max(row.get("epoch", 0) for row in rows)
summaries = []
for key, seconds in TRAFFIC_WINDOWS.items():
window = [row for row in rows if row.get("epoch", 0) >= latest - seconds]
if not window:
summaries.append({"range": key, "points": 0, "from": 0, "to": latest, "proxy_delta": 0, "site_delta": 0, "proxy_total": 0, "site_total": 0})
continue
first = window[0]
last = window[-1]
summaries.append({
"range": key,
"points": len(window),
"from": first.get("epoch", 0),
"to": last.get("epoch", 0),
"proxy_delta": max(0, int(last.get("proxy_bytes", 0)) - int(first.get("proxy_bytes", 0))),
"site_delta": max(0, int(last.get("site_bytes", 0)) - int(first.get("site_bytes", 0))),
"proxy_total": int(last.get("proxy_bytes", 0)),
"site_total": int(last.get("site_bytes", 0)),
})
return summaries
def count_history_rows() -> int:
if not HISTORY_FILE.exists():
return 0
@@ -537,6 +706,7 @@ def overview_payload() -> dict[str, Any]:
"site_status": site_status(config),
"users_count": len(users),
"services": services,
"port_443": port_443_status(),
"stats_current": current,
"stats_history": history,
"stats_status": stats_status(current, history),
@@ -599,9 +769,21 @@ class AdminHandler(BaseHTTPRequestHandler):
elif path == "/api/backups":
self.send_json({"ok": True, "data": list_backups()})
elif path == "/api/stats":
qs = urllib.parse.parse_qs(parsed.query)
range_key = normalize_range(qs.get("range", ["1h"])[0])
current = load_json(CURRENT_STATS, {}) or {}
history = load_stats_history()
self.send_json({"ok": True, "data": {"current": current, "history": history, "status": stats_status(current, history)}})
all_history = load_stats_history(limit=history_limit_for_range("month"))
history = filter_history_by_range(all_history[-history_limit_for_range(range_key):], range_key)
self.send_json({
"ok": True,
"data": {
"range": range_key,
"current": current,
"history": history,
"summary_rows": traffic_interval_summaries(all_history),
"status": stats_status(current, history),
},
})
elif path == "/api/site/check":
self.send_json({"ok": True, "data": site_status()})
elif path == "/api/logs":
@@ -631,51 +813,53 @@ class AdminHandler(BaseHTTPRequestHandler):
if not USER_RE.match(name):
self.send_error_json(400, "invalid user name")
return
records = read_user_records()
if name in records:
self.send_error_json(409, "user already exists")
return
users = read_telemt_users()
seed = f"{name}:{time.time()}:{secrets.token_hex(32)}".encode()
secret = hashlib.sha256(seed).hexdigest()[:32]
users[name] = secret
try:
write_telemt_users(users)
with FileLock(USER_LOCK_FILE):
records = read_user_records()
if name in records:
self.send_error_json(409, "user already exists")
return
users = read_telemt_users()
seed = f"{name}:{time.time()}:{secrets.token_hex(32)}".encode()
secret = hashlib.sha256(seed).hexdigest()[:32]
users[name] = secret
write_telemt_users(users)
except Exception as exc:
self.send_error_json(500, f"failed to save config: {exc}")
return
restarted = restart_service("telemt")
self.send_json({"ok": True, "data": user_payload(name, secret, True), "restarted": restarted})
restart_requested = request_service_restart("telemt")
self.send_json({"ok": True, "data": user_payload(name, secret, True), "restart": {"mode": "async", "requested": restart_requested}})
elif path.startswith("/api/users/") and path.endswith("/enabled"):
name = urllib.parse.unquote(path[len("/api/users/"):-len("/enabled")])
if name == "main":
self.send_error_json(400, "main user cannot be disabled")
return
enabled = bool(body.get("enabled"))
active = read_telemt_users()
disabled = read_disabled_users()
records = read_user_records()
if name not in records:
self.send_error_json(404, "user not found")
return
if enabled:
secret = disabled.pop(name, records[name]["secret"])
active[name] = secret
else:
secret = active.pop(name, records[name]["secret"])
disabled[name] = secret
try:
if enabled:
write_telemt_users(active)
write_disabled_users(disabled)
else:
write_disabled_users(disabled)
write_telemt_users(active)
with FileLock(USER_LOCK_FILE):
active = read_telemt_users()
disabled = read_disabled_users()
records = read_user_records()
if name not in records:
self.send_error_json(404, "user not found")
return
if enabled:
secret = disabled.pop(name, records[name]["secret"])
active[name] = secret
else:
secret = active.pop(name, records[name]["secret"])
disabled[name] = secret
if enabled:
write_telemt_users(active)
write_disabled_users(disabled)
else:
write_disabled_users(disabled)
write_telemt_users(active)
except Exception as exc:
self.send_error_json(500, f"failed to save config: {exc}")
return
restarted = restart_service("telemt")
self.send_json({"ok": True, "data": user_payload(name, secret, enabled), "restarted": restarted})
restart_requested = request_service_restart("telemt")
self.send_json({"ok": True, "data": user_payload(name, secret, enabled), "restart": {"mode": "async", "requested": restart_requested}})
elif path == "/api/backups":
ok, result = create_backup()
self.send_json({"ok": ok, "data": {"path": result, "backups": list_backups()}}, 200 if ok else 500)
@@ -716,22 +900,23 @@ class AdminHandler(BaseHTTPRequestHandler):
if name == "main":
self.send_error_json(400, "main user cannot be deleted")
return
active = read_telemt_users()
disabled = read_disabled_users()
records = read_user_records()
if name not in records:
self.send_error_json(404, "user not found")
return
active.pop(name, None)
disabled.pop(name, None)
try:
write_telemt_users(active)
write_disabled_users(disabled)
with FileLock(USER_LOCK_FILE):
active = read_telemt_users()
disabled = read_disabled_users()
records = read_user_records()
if name not in records:
self.send_error_json(404, "user not found")
return
active.pop(name, None)
disabled.pop(name, None)
write_telemt_users(active)
write_disabled_users(disabled)
except Exception as exc:
self.send_error_json(500, f"failed to save config: {exc}")
return
restarted = restart_service("telemt")
self.send_json({"ok": True, "restarted": restarted})
restart_requested = request_service_restart("telemt")
self.send_json({"ok": True, "restart": {"mode": "async", "requested": restart_requested}})
def send_static(self, parsed: urllib.parse.ParseResult) -> None:
rel = parsed.path.lstrip("/") or "index.html"