mirror of
https://github.com/anten-ka/gotelegram_pro.git
synced 2026-05-19 12:36:02 +00:00
v2.5.0: use live IP counts in key cards
This commit is contained in:
@@ -851,6 +851,39 @@ def runtime_user_traffic(name: str, enabled: bool = True) -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def current_user_traffic_snapshot(
|
||||
name: str,
|
||||
enabled: bool,
|
||||
history_snapshot: dict[str, Any] | None = None,
|
||||
now: int | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Return live counters for key cards, preserving only total bytes from history.
|
||||
|
||||
History rows are minute snapshots. They are useful for charts, but stale
|
||||
connection/IP values make the keys list look like users are still online.
|
||||
"""
|
||||
history_snapshot = history_snapshot or {}
|
||||
fallback = {
|
||||
"epoch": _int_value(history_snapshot.get("epoch")),
|
||||
"total_octets": _int_value(history_snapshot.get("total_octets")),
|
||||
"current_connections": 0,
|
||||
"active_unique_ips": 0,
|
||||
"recent_unique_ips": 0,
|
||||
}
|
||||
if not enabled:
|
||||
return fallback
|
||||
runtime = runtime_user_traffic(name, enabled)
|
||||
if not runtime.get("ok"):
|
||||
return fallback
|
||||
return {
|
||||
"epoch": _int_value(now if now is not None else time.time()),
|
||||
"total_octets": _int_value(runtime.get("total_octets")),
|
||||
"current_connections": _int_value(runtime.get("current_connections")),
|
||||
"active_unique_ips": _int_value(runtime.get("active_unique_ips")),
|
||||
"recent_unique_ips": _int_value(runtime.get("recent_unique_ips")),
|
||||
}
|
||||
|
||||
|
||||
def history_limit_for_range(range_key: str) -> int:
|
||||
return {
|
||||
"15m": 180,
|
||||
@@ -1286,7 +1319,7 @@ class AdminHandler(BaseHTTPRequestHandler):
|
||||
record["secret"],
|
||||
record["enabled"],
|
||||
record.get("max_unique_ips", 0),
|
||||
traffic_snapshot=latest.get(name),
|
||||
traffic_snapshot=current_user_traffic_snapshot(name, record["enabled"], latest.get(name)),
|
||||
))
|
||||
self.send_json({"ok": True, "data": items})
|
||||
elif path.startswith("/api/users/") and path.endswith("/qr"):
|
||||
@@ -1347,7 +1380,7 @@ class AdminHandler(BaseHTTPRequestHandler):
|
||||
record["enabled"],
|
||||
record.get("max_unique_ips", 0),
|
||||
include_runtime=True,
|
||||
traffic_snapshot=latest_user_stats().get(name),
|
||||
traffic_snapshot=current_user_traffic_snapshot(name, record["enabled"], latest_user_stats().get(name)),
|
||||
)})
|
||||
elif path == "/api/backups":
|
||||
self.send_json({"ok": True, "data": list_backups()})
|
||||
@@ -1443,7 +1476,7 @@ class AdminHandler(BaseHTTPRequestHandler):
|
||||
record["secret"],
|
||||
record["enabled"],
|
||||
record.get("max_unique_ips", 0),
|
||||
traffic_snapshot=latest_user_stats().get(name),
|
||||
traffic_snapshot=current_user_traffic_snapshot(name, record["enabled"], latest_user_stats().get(name)),
|
||||
), "restart": {"mode": "async", "requested": restart_requested}})
|
||||
elif path.startswith("/api/users/") and path.endswith("/enabled"):
|
||||
name = urllib.parse.unquote(path[len("/api/users/"):-len("/enabled")])
|
||||
@@ -1475,7 +1508,13 @@ class AdminHandler(BaseHTTPRequestHandler):
|
||||
self.send_error_json(500, f"failed to save config: {exc}")
|
||||
return
|
||||
restart_requested = request_service_restart("telemt")
|
||||
self.send_json({"ok": True, "data": user_payload(name, secret, enabled, records[name].get("max_unique_ips", 0)), "restart": {"mode": "async", "requested": restart_requested}})
|
||||
self.send_json({"ok": True, "data": user_payload(
|
||||
name,
|
||||
secret,
|
||||
enabled,
|
||||
records[name].get("max_unique_ips", 0),
|
||||
traffic_snapshot=current_user_traffic_snapshot(name, enabled, latest_user_stats().get(name)),
|
||||
), "restart": {"mode": "async", "requested": restart_requested}})
|
||||
elif path == "/api/backups":
|
||||
ok, result = create_backup()
|
||||
self.send_json({"ok": ok, "data": {"path": result, "backups": list_backups()}}, 200 if ok else 500)
|
||||
|
||||
@@ -129,6 +129,59 @@ class AdminFeatureTests(unittest.TestCase):
|
||||
self.assertNotIn("client = 3", text)
|
||||
self.assertNotIn("old = 4", text)
|
||||
|
||||
def test_key_card_traffic_uses_live_ip_counts_not_stale_history(self):
|
||||
with tempfile.TemporaryDirectory() as raw:
|
||||
server = load_server(Path(raw))
|
||||
stale_history = {
|
||||
"epoch": 1000,
|
||||
"total_octets": 100,
|
||||
"current_connections": 16,
|
||||
"active_unique_ips": 8,
|
||||
"recent_unique_ips": 5,
|
||||
}
|
||||
original = server.runtime_user_traffic
|
||||
server.runtime_user_traffic = lambda name, enabled=True: {
|
||||
"ok": True,
|
||||
"enabled": True,
|
||||
"total_octets": 200,
|
||||
"current_connections": 0,
|
||||
"active_unique_ips": 0,
|
||||
"recent_unique_ips": 0,
|
||||
}
|
||||
try:
|
||||
snapshot = server.current_user_traffic_snapshot("client", True, stale_history, now=2000)
|
||||
finally:
|
||||
server.runtime_user_traffic = original
|
||||
|
||||
self.assertEqual(snapshot["epoch"], 2000)
|
||||
self.assertEqual(snapshot["total_octets"], 200)
|
||||
self.assertEqual(snapshot["current_connections"], 0)
|
||||
self.assertEqual(snapshot["active_unique_ips"], 0)
|
||||
self.assertEqual(snapshot["recent_unique_ips"], 0)
|
||||
|
||||
def test_key_card_traffic_fallback_keeps_only_historical_total(self):
|
||||
with tempfile.TemporaryDirectory() as raw:
|
||||
server = load_server(Path(raw))
|
||||
stale_history = {
|
||||
"epoch": 1000,
|
||||
"total_octets": 100,
|
||||
"current_connections": 16,
|
||||
"active_unique_ips": 8,
|
||||
"recent_unique_ips": 5,
|
||||
}
|
||||
original = server.runtime_user_traffic
|
||||
server.runtime_user_traffic = lambda name, enabled=True: {"ok": False}
|
||||
try:
|
||||
snapshot = server.current_user_traffic_snapshot("client", True, stale_history, now=2000)
|
||||
finally:
|
||||
server.runtime_user_traffic = original
|
||||
|
||||
self.assertEqual(snapshot["epoch"], 1000)
|
||||
self.assertEqual(snapshot["total_octets"], 100)
|
||||
self.assertEqual(snapshot["current_connections"], 0)
|
||||
self.assertEqual(snapshot["active_unique_ips"], 0)
|
||||
self.assertEqual(snapshot["recent_unique_ips"], 0)
|
||||
|
||||
def test_keys_view_uses_card_layout_without_horizontal_table(self):
|
||||
app_js = (ROOT / "admin-web" / "static" / "app.js").read_text(encoding="utf-8")
|
||||
styles = (ROOT / "admin-web" / "static" / "styles.css").read_text(encoding="utf-8")
|
||||
|
||||
Reference in New Issue
Block a user