v2.5.0: compact traffic history retention

This commit is contained in:
Виталий Литвинов
2026-04-25 14:15:28 +03:00
parent 63b564f70f
commit c7540a97f7
4 changed files with 111 additions and 16 deletions

View File

@@ -691,9 +691,26 @@ def load_user_stats_history(name: str | None = None, limit: int | None = 240) ->
def latest_user_stats() -> dict[str, dict[str, Any]]:
latest: dict[str, dict[str, Any]] = {}
for row in load_user_stats_history(limit=None):
if row["epoch"] >= latest.get(row["user"], {}).get("epoch", 0):
latest[row["user"]] = row
if not USER_HISTORY_FILE.exists():
return latest
try:
with USER_HISTORY_FILE.open("r", encoding="utf-8", newline="") as fh:
for row in csv.DictReader(fh):
user = str(row.get("user") or "").strip()
if not USER_RE.match(user):
continue
item = {
"epoch": _int_value(row.get("epoch")),
"user": user,
"total_octets": _int_value(row.get("total_octets")),
"current_connections": _int_value(row.get("current_connections")),
"active_unique_ips": _int_value(row.get("active_unique_ips")),
"recent_unique_ips": _int_value(row.get("recent_unique_ips")),
}
if item["epoch"] >= latest.get(user, {}).get("epoch", 0):
latest[user] = item
except OSError:
return {}
return latest