v2.5.0: add shared 443 and per-user traffic

This commit is contained in:
Виталий Литвинов
2026-04-25 14:07:47 +03:00
parent c1b5ffc5a7
commit 63b564f70f
12 changed files with 990 additions and 34 deletions

View File

@@ -34,12 +34,14 @@ STATIC_DIR = Path(os.getenv("GOTELEGRAM_ADMIN_STATIC", str(ADMIN_DIR / "static")
GOTELEGRAM_CONFIG = Path(os.getenv("GOTELEGRAM_CONFIG", "/opt/gotelegram/config.json"))
TELEMT_CONFIG = Path(os.getenv("TELEMT_CONFIG", "/etc/telemt/config.toml"))
HISTORY_FILE = Path(os.getenv("GOTELEGRAM_STATS_HISTORY", "/opt/gotelegram/stats_history.csv"))
USER_HISTORY_FILE = Path(os.getenv("GOTELEGRAM_USER_STATS_HISTORY", "/opt/gotelegram/user_stats_history.csv"))
CURRENT_STATS = Path(os.getenv("GOTELEGRAM_STATS_CURRENT", "/run/gotelegram/stats_current.json"))
BACKUP_DIR = Path(os.getenv("GOTELEGRAM_BACKUP_DIR", "/opt/gotelegram/backups"))
INSTALL_DIR = Path(os.getenv("GOTELEGRAM_DIR", "/opt/gotelegram"))
BOT_DIR = Path(os.getenv("GOTELEGRAM_BOT_DIR", "/opt/gotelegram-bot"))
DISABLED_USERS_FILE = Path(os.getenv("GOTELEGRAM_DISABLED_USERS", "/opt/gotelegram/disabled_users.json"))
USER_LOCK_FILE = Path(os.getenv("GOTELEGRAM_USER_LOCK", "/run/gotelegram/admin-users.lock"))
SHARED_443_CONFIG = Path(os.getenv("GOTELEGRAM_SHARED_443", "/opt/gotelegram/shared-443.json"))
HOST = os.getenv("GOTELEGRAM_ADMIN_HOST", "127.0.0.1")
PORT = int(os.getenv("GOTELEGRAM_ADMIN_PORT", "1984"))
@@ -421,14 +423,81 @@ def read_telemt_edge_settings() -> dict[str, Any]:
return settings
def load_shared443_config() -> dict[str, Any]:
raw = load_json(SHARED_443_CONFIG, {}) or {}
if not isinstance(raw, dict):
return {}
routes = raw.get("xray_routes") if isinstance(raw.get("xray_routes"), list) else []
clean_routes = []
for item in routes:
if not isinstance(item, dict):
continue
public = str(item.get("public") or item.get("domain") or "").strip()
target = str(item.get("target") or "").strip()
if public and target:
clean_routes.append({"public": public, "target": target})
return {
"enabled": bool(raw.get("enabled")),
"dispatcher": str(raw.get("dispatcher") or "nginx-stream"),
"public_port": _int_value(raw.get("public_port") or 443) or 443,
"telemt_target": str(raw.get("telemt_target") or "127.0.0.1:7443"),
"site_target": str(raw.get("site_target") or ""),
"xray_routes": clean_routes,
"updated_at": str(raw.get("updated_at") or ""),
}
def listener_for_target(target: str) -> dict[str, Any] | None:
try:
port = int(target.rsplit(":", 1)[-1])
except ValueError:
return None
listeners, _ = collect_port_listeners(port)
return listeners[0] if listeners else None
def routed_behind_443() -> list[dict[str, Any]]:
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
mode = str(config.get("mode") or "")
domain = str(config.get("domain") or "")
settings = read_telemt_edge_settings()
shared = load_shared443_config()
mask_port = int(settings.get("mask_port") or 0)
tls_domain = str(settings.get("tls_domain") or domain)
routes: list[dict[str, Any]] = []
if shared.get("enabled"):
telemt_target = str(shared.get("telemt_target") or "127.0.0.1:7443")
telemt_listener = listener_for_target(telemt_target)
routes.append({
"role": "mtproxy",
"proto": "MTProxy",
"public": f"{domain or tls_domain or 'default'}:443",
"target": telemt_target,
"process": (telemt_listener or {}).get("process") or "telemt",
"pid": (telemt_listener or {}).get("pid") or "",
"status": service_status("telemt"),
"via": "nginx stream ssl_preread",
"tls_domain": tls_domain,
"details": ["default -> telemt"] if not shared.get("xray_routes") else [],
})
for item in shared.get("xray_routes", []):
target = item.get("target", "")
listener = listener_for_target(target)
public = item.get("public", "")
if public and ":" not in public:
public = f"{public}:443"
routes.append({
"role": "xray",
"proto": "VLESS",
"public": public or "xray:443",
"target": target,
"process": (listener or {}).get("process") or "xray",
"pid": (listener or {}).get("pid") or "",
"status": "running" if listener else "not_installed",
"via": "nginx stream ssl_preread",
"tls_domain": public.split(":", 1)[0] if public else "",
"details": [],
})
if mode == "pro" and domain and mask_port and mask_port != 443:
internal, _ = collect_port_listeners(mask_port)
site_listener = next((item for item in internal if item.get("role") == "site"), None)
@@ -449,11 +518,18 @@ def routed_behind_443() -> list[dict[str, Any]]:
def port_443_status() -> dict[str, Any]:
listeners, errors = collect_port_listeners(443)
shared = load_shared443_config()
if shared.get("enabled"):
for item in listeners:
if item.get("role") == "site" and "nginx" in str(item.get("process", "")).lower():
item["role"] = "edge"
item["details"] = "nginx stream ssl_preread"
return {
"checked_at": int(time.time()),
"configured_port": read_telemt_port(),
"listeners": listeners,
"routes": routed_behind_443(),
"shared_443": shared,
"ok": not errors,
"error": "; ".join(errors[:2]),
}
@@ -567,6 +643,78 @@ def load_stats_history(limit: int | None = 240) -> list[dict[str, int]]:
return enriched
def _int_value(value: Any) -> int:
try:
return int(value or 0)
except (TypeError, ValueError):
return 0
def load_user_stats_history(name: str | None = None, limit: int | None = 240) -> list[dict[str, Any]]:
if not USER_HISTORY_FILE.exists():
return []
rows: list[dict[str, Any]] = []
try:
with USER_HISTORY_FILE.open("r", encoding="utf-8", newline="") as fh:
for row in csv.DictReader(fh):
user = str(row.get("user") or "").strip()
if name is not None and user != name:
continue
if not USER_RE.match(user):
continue
rows.append({
"epoch": _int_value(row.get("epoch")),
"user": user,
"total_octets": _int_value(row.get("total_octets")),
"current_connections": _int_value(row.get("current_connections")),
"active_unique_ips": _int_value(row.get("active_unique_ips")),
"recent_unique_ips": _int_value(row.get("recent_unique_ips")),
})
except OSError:
return []
rows.sort(key=lambda item: (item["user"], item["epoch"]))
if limit and name is not None:
rows = rows[-limit:]
previous_by_user: dict[str, dict[str, Any]] = {}
enriched: list[dict[str, Any]] = []
for row in rows:
item = dict(row)
previous = previous_by_user.get(row["user"])
item["total_delta"] = max(0, row["total_octets"] - previous["total_octets"]) if previous else 0
enriched.append(item)
previous_by_user[row["user"]] = row
if limit and name is None:
enriched = enriched[-limit:]
return enriched
def latest_user_stats() -> dict[str, dict[str, Any]]:
latest: dict[str, dict[str, Any]] = {}
for row in load_user_stats_history(limit=None):
if row["epoch"] >= latest.get(row["user"], {}).get("epoch", 0):
latest[row["user"]] = row
return latest
def runtime_user_traffic(name: str, enabled: bool = True) -> dict[str, Any]:
if not enabled:
return {"ok": False, "enabled": False, "total_octets": 0, "current_connections": 0, "active_unique_ips": 0, "recent_unique_ips": 0}
payload = telemt_api(f"/v1/users/{urllib.parse.quote(name, safe='')}")
data = payload.get("data", payload) if isinstance(payload, dict) else {}
if not isinstance(data, dict):
data = {}
return {
"ok": bool(payload),
"enabled": True,
"total_octets": _int_value(data.get("total_octets")),
"current_connections": _int_value(data.get("current_connections")),
"active_unique_ips": _int_value(data.get("active_unique_ips")),
"recent_unique_ips": _int_value(data.get("recent_unique_ips")),
"in_runtime": bool(data.get("in_runtime")) if data else False,
}
def history_limit_for_range(range_key: str) -> int:
return {
"15m": 180,
@@ -617,6 +765,32 @@ def traffic_interval_summaries(rows: list[dict[str, int]]) -> list[dict[str, Any
return summaries
def user_traffic_interval_summaries(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
if not rows:
return [
{"range": key, "points": 0, "from": 0, "to": 0, "total_delta": 0, "total_octets": 0}
for key in TRAFFIC_WINDOWS
]
latest = max(row.get("epoch", 0) for row in rows)
summaries = []
for key, seconds in TRAFFIC_WINDOWS.items():
window = [row for row in rows if row.get("epoch", 0) >= latest - seconds]
if not window:
summaries.append({"range": key, "points": 0, "from": 0, "to": latest, "total_delta": 0, "total_octets": 0})
continue
first = window[0]
last = window[-1]
summaries.append({
"range": key,
"points": len(window),
"from": first.get("epoch", 0),
"to": last.get("epoch", 0),
"total_delta": sum(max(0, int(item.get("total_delta", 0))) for item in window),
"total_octets": int(last.get("total_octets", 0)),
})
return summaries
def count_history_rows() -> int:
if not HISTORY_FILE.exists():
return 0
@@ -627,6 +801,18 @@ def count_history_rows() -> int:
return 0
def count_user_history_rows(name: str | None = None) -> int:
if not USER_HISTORY_FILE.exists():
return 0
try:
with USER_HISTORY_FILE.open("r", encoding="utf-8", errors="ignore") as fh:
if name is None:
return sum(1 for line in fh if line and line[0].isdigit())
return sum(1 for line in fh if line.startswith(tuple(str(d) for d in range(10))) and f",{name}," in line)
except OSError:
return 0
def stats_status(current: dict[str, Any] | None = None, history: list[dict[str, int]] | None = None) -> dict[str, Any]:
current = current if current is not None else (load_json(CURRENT_STATS, {}) or {})
history = history if history is not None else load_stats_history(limit=2)
@@ -740,7 +926,13 @@ def read_log_payload(service: str) -> dict[str, Any]:
}
def user_payload(name: str, secret: str, enabled: bool = True, include_runtime: bool = False) -> dict[str, Any]:
def user_payload(
name: str,
secret: str,
enabled: bool = True,
include_runtime: bool = False,
traffic_snapshot: dict[str, Any] | None = None,
) -> dict[str, Any]:
item: dict[str, Any] = {
"name": name,
"secret": secret,
@@ -748,6 +940,14 @@ def user_payload(name: str, secret: str, enabled: bool = True, include_runtime:
"main": name == "main",
"enabled": bool(enabled),
}
if traffic_snapshot:
item["traffic"] = {
"epoch": traffic_snapshot.get("epoch", 0),
"total_octets": traffic_snapshot.get("total_octets", 0),
"current_connections": traffic_snapshot.get("current_connections", 0),
"active_unique_ips": traffic_snapshot.get("active_unique_ips", 0),
"recent_unique_ips": traffic_snapshot.get("recent_unique_ips", 0),
}
if include_runtime and enabled:
item["runtime"] = telemt_api(f"/v1/users/{urllib.parse.quote(name, safe='')}")
return item
@@ -823,11 +1023,40 @@ class AdminHandler(BaseHTTPRequestHandler):
self.send_json({"ok": True, "data": overview_payload()})
elif path == "/api/users":
users = read_user_records()
latest = latest_user_stats()
items = []
for name in sorted(users, key=lambda item: (item != "main", item)):
record = users[name]
items.append(user_payload(name, record["secret"], record["enabled"]))
items.append(user_payload(name, record["secret"], record["enabled"], traffic_snapshot=latest.get(name)))
self.send_json({"ok": True, "data": items})
elif path.startswith("/api/users/") and path.endswith("/traffic"):
name = urllib.parse.unquote(path[len("/api/users/"):-len("/traffic")])
users = read_user_records()
if name not in users:
self.send_error_json(404, "user not found")
return
qs = urllib.parse.parse_qs(parsed.query)
range_key = normalize_range(qs.get("range", ["1h"])[0])
all_history = load_user_stats_history(name, limit=history_limit_for_range("month"))
history = filter_history_by_range(all_history[-history_limit_for_range(range_key):], range_key)
current = runtime_user_traffic(name, bool(users[name].get("enabled")))
self.send_json({
"ok": True,
"data": {
"name": name,
"range": range_key,
"current": current,
"history": history,
"summary_rows": user_traffic_interval_summaries(all_history),
"status": {
"history_exists": USER_HISTORY_FILE.exists(),
"history_rows": count_user_history_rows(name),
"history_points": len(history),
"last_ts": history[-1]["epoch"] if history else 0,
"runtime_ok": current.get("ok", False),
},
},
})
elif path.startswith("/api/users/"):
name = urllib.parse.unquote(path[len("/api/users/"):])
users = read_user_records()
@@ -835,7 +1064,7 @@ class AdminHandler(BaseHTTPRequestHandler):
self.send_error_json(404, "user not found")
return
record = users[name]
self.send_json({"ok": True, "data": user_payload(name, record["secret"], record["enabled"], include_runtime=True)})
self.send_json({"ok": True, "data": user_payload(name, record["secret"], record["enabled"], include_runtime=True, traffic_snapshot=latest_user_stats().get(name))})
elif path == "/api/backups":
self.send_json({"ok": True, "data": list_backups()})
elif path == "/api/stats":