v2.5.0: align key controls and add IP limits

This commit is contained in:
Виталий Литвинов
2026-04-25 15:19:28 +03:00
parent bd3fc1af18
commit 507a2979e5
8 changed files with 583 additions and 33 deletions

View File

@@ -278,6 +278,7 @@ _DOMAIN_RE = re.compile(
r"(?!-)[A-Za-z0-9-]{2,63}(?<!-)$"
)
_USER_NAME_RE = re.compile(r"^[A-Za-z0-9_.-]{1,48}$")
MAX_UNIQUE_IP_LIMIT = 1000000
class FileLock:
@@ -1447,6 +1448,25 @@ def ordered_user_lines(users: Dict[str, str]) -> List[str]:
return [f'{quote_toml_key(name)} = "{users[name]}"' for name in names]
def ordered_user_int_lines(values: Dict[str, int]) -> List[str]:
positive: Dict[str, int] = {}
for name, value in values.items():
name_s = str(name)
if not _USER_NAME_RE.match(name_s):
continue
try:
number = int(value)
except (TypeError, ValueError):
continue
if number > 0:
positive[name_s] = number
names: List[str] = []
if "main" in positive:
names.append("main")
names.extend(sorted(name for name in positive if name != "main"))
return [f'{quote_toml_key(name)} = {positive[name]}' for name in names]
def load_telemt_users() -> Dict[str, str]:
"""Return users from [access.users] in telemt config."""
telemt_cfg = load_toml(TELEMT_CONFIG) or {}
@@ -1460,6 +1480,23 @@ def load_telemt_users() -> Dict[str, str]:
}
def load_user_max_unique_ips() -> Dict[str, int]:
telemt_cfg = load_toml(TELEMT_CONFIG) or {}
limits = telemt_cfg.get("access", {}).get("user_max_unique_ips", {})
if not isinstance(limits, dict):
return {}
clean: Dict[str, int] = {}
for name, value in limits.items():
name_s = str(name)
if not _USER_NAME_RE.match(name_s):
continue
try:
clean[name_s] = max(0, int(value))
except (TypeError, ValueError):
continue
return clean
def load_disabled_users() -> Dict[str, str]:
raw = load_json(DISABLED_USERS_FILE) or {}
if not isinstance(raw, dict):
@@ -1495,13 +1532,70 @@ def save_disabled_users(users: Dict[str, str]) -> bool:
def load_user_records() -> Dict[str, Dict[str, Any]]:
records: Dict[str, Dict[str, Any]] = {}
limits = load_user_max_unique_ips()
for name, secret in load_disabled_users().items():
records[name] = {"secret": secret, "enabled": False}
records[name] = {"secret": secret, "enabled": False, "max_unique_ips": limits.get(name, 0)}
for name, secret in load_telemt_users().items():
records[name] = {"secret": secret, "enabled": True}
records[name] = {"secret": secret, "enabled": True, "max_unique_ips": limits.get(name, 0)}
return records
def save_toml_int_table(table: str, values: Dict[str, int]) -> bool:
try:
os.makedirs(os.path.dirname(TELEMT_CONFIG), exist_ok=True)
if os.path.exists(TELEMT_CONFIG):
with open(TELEMT_CONFIG, "r", encoding="utf-8", errors="ignore") as f:
lines = f.read().splitlines()
else:
lines = []
rendered = ordered_user_int_lines(values)
header = f"[{table}]"
out: List[str] = []
in_table = False
found = False
for raw in lines:
if raw.strip() == header:
found = True
in_table = True
if rendered:
out.append(raw)
out.extend(rendered)
continue
if in_table and raw.strip().startswith("["):
in_table = False
if in_table:
continue
out.append(raw)
if not found and rendered:
if out and out[-1].strip():
out.append("")
out.append(header)
out.extend(rendered)
tmp = f"{TELEMT_CONFIG}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
f.write("\n".join(out).rstrip() + "\n")
os.chmod(tmp, 0o600)
os.replace(tmp, TELEMT_CONFIG)
return True
except Exception as e:
logger.error(f"Failed to save telemt int table {table}: {e}")
return False
def save_user_max_unique_ips(values: Dict[str, int]) -> bool:
return save_toml_int_table("access.user_max_unique_ips", values)
def normalize_max_unique_ips(value: Any) -> int:
try:
number = int(value)
except (TypeError, ValueError):
raise ValueError("Лимит должен быть целым числом")
if number < 0 or number > MAX_UNIQUE_IP_LIMIT:
raise ValueError(f"Лимит должен быть от 0 до {MAX_UNIQUE_IP_LIMIT}")
return number
def save_telemt_users(users: Dict[str, str]) -> bool:
"""Persist [access.users] while keeping the rest of the TOML structure."""
try:
@@ -1791,7 +1885,7 @@ async def cb_menu_users(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N
await safe_edit_message(query, text, reply_markup=_users_keyboard(users, user_id), parse_mode="HTML")
async def _user_detail_text(name: str, secret: str, enabled: bool = True) -> str:
async def _user_detail_text(name: str, secret: str, enabled: bool = True, max_unique_ips: int = 0) -> str:
link = await get_proxy_link_for_secret(secret)
api = await telemt_api_get(f"/v1/users/{quote(name, safe='')}") if enabled else None
details = ""
@@ -1820,9 +1914,11 @@ async def _user_detail_text(name: str, secret: str, enabled: bool = True) -> str
link_line = html.escape(link) if link else "link unavailable"
status_line = "🟢 enabled" if enabled else "⏸ disabled"
limit_line = "0 (безлимит)" if not max_unique_ips else str(max_unique_ips)
return (
f"<b>👤 {html.escape(name)}</b>\n\n"
f"Status: <b>{status_line}</b>\n"
f"Лимит IP: <code>{html.escape(limit_line)}</code>\n"
f"Secret: <code>{html.escape(secret)}</code>\n\n"
f"<b>Ссылка:</b>\n<code>{link_line}</code>\n"
f"{details}"
@@ -1845,9 +1941,11 @@ async def cb_user_view(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
return
enabled = bool(record.get("enabled"))
secret = str(record.get("secret", ""))
max_unique_ips = int(record.get("max_unique_ips") or 0)
buttons = [
[InlineKeyboardButton("⏸ Отключить" if enabled else "▶️ Включить", callback_data=f"user_toggle_{name}")],
[InlineKeyboardButton("🌐 Лимит IP", callback_data=f"user_ip_limit_{name}")],
[InlineKeyboardButton("📷 QR", callback_data=f"user_qr_{name}")],
[InlineKeyboardButton("🗑 Удалить", callback_data=f"user_del_{name}")],
[InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_users")],
@@ -1855,12 +1953,13 @@ async def cb_user_view(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
if name == "main":
buttons = [
[InlineKeyboardButton("🔒 Main key", callback_data=f"user_view_{name}")],
[InlineKeyboardButton("🌐 Лимит IP", callback_data=f"user_ip_limit_{name}")],
[InlineKeyboardButton("📷 QR", callback_data=f"user_qr_{name}")],
[InlineKeyboardButton(_t(user_id, "btn_back"), callback_data="menu_users")],
]
await safe_edit_message(
query,
await _user_detail_text(name, secret, enabled),
await _user_detail_text(name, secret, enabled, max_unique_ips),
reply_markup=InlineKeyboardMarkup(buttons),
parse_mode="HTML",
disable_web_page_preview=True,
@@ -1909,6 +2008,61 @@ async def cb_user_qr(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None
)
async def cb_user_ip_limit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer()
user_id = _uid(update)
name = query.data.removeprefix("user_ip_limit_")
users = load_user_records()
record = users.get(name)
if not record:
await query.answer("Ключ не найден", show_alert=True)
return
current = int(record.get("max_unique_ips") or 0)
context.user_data["awaiting_user_ip_limit"] = name
text = (
f"<b>🌐 Лимит IP: {html.escape(name)}</b>\n\n"
f"Текущее значение: <code>{current}</code>\n"
"Отправьте число: <code>0</code> — безлимит, <code>1</code> — только один активный IP, "
"<code>2</code> — два активных IP и так далее."
)
await safe_edit_message(
query,
text,
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(_t(user_id, "btn_cancel"), callback_data=f"user_view_{name}")]]),
parse_mode="HTML",
)
async def set_user_ip_limit_from_text(update: Update, context: ContextTypes.DEFAULT_TYPE, raw_value: str, name: str) -> None:
user_id = update.effective_user.id
try:
limit = normalize_max_unique_ips(raw_value.strip())
except ValueError as exc:
await update.message.reply_text(f"{html.escape(str(exc))}", parse_mode="HTML")
return
with FileLock(USER_LOCK_FILE):
records = load_user_records()
if name not in records:
await update.message.reply_text("❌ Ключ не найден.")
return
limits = load_user_max_unique_ips()
if limit > 0:
limits[name] = limit
else:
limits.pop(name, None)
saved = save_user_max_unique_ips(limits)
if not saved:
await update.message.reply_text("Не удалось сохранить /etc/telemt/config.toml")
return
await refresh_telemt_after_user_change()
await update.message.reply_text(
f"✅ Лимит IP сохранён для <code>{html.escape(name)}</code>: <code>{limit}</code>",
reply_markup=_users_keyboard(load_user_records(), user_id),
parse_mode="HTML",
)
async def cb_user_add(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer()
@@ -1997,7 +2151,9 @@ async def cb_user_delete_confirm(update: Update, context: ContextTypes.DEFAULT_T
return
active.pop(name, None)
disabled.pop(name, None)
saved = save_telemt_users(active) and save_disabled_users(disabled)
limits = load_user_max_unique_ips()
limits.pop(name, None)
saved = save_telemt_users(active) and save_disabled_users(disabled) and save_user_max_unique_ips(limits)
if not saved:
await safe_edit_message(query, "Не удалось сохранить config.toml")
return
@@ -3022,6 +3178,8 @@ async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
await cb_user_view(update, context)
elif data.startswith("user_qr_"):
await cb_user_qr(update, context)
elif data.startswith("user_ip_limit_"):
await cb_user_ip_limit(update, context)
elif data.startswith("user_toggle_"):
await cb_user_toggle(update, context)
elif data.startswith("user_del_yes_"):
@@ -3054,6 +3212,10 @@ async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE
if not is_user_allowed(update.effective_user.id):
return
user_id = update.effective_user.id
ip_limit_user = context.user_data.pop("awaiting_user_ip_limit", None)
if ip_limit_user:
await set_user_ip_limit_from_text(update, context, update.message.text.strip(), str(ip_limit_user))
return
if context.user_data.pop("awaiting_user_name", False):
await create_user_from_text(update, context, update.message.text.strip())
return