import importlib.util import json import os import re import sys import tempfile import unittest from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SERVER_PATH = ROOT / "admin-web" / "server.py" def load_server(tmpdir: Path): os.environ["GOTELEGRAM_BACKUP_DIR"] = str(tmpdir / "backups") os.environ["GOTELEGRAM_DIR"] = str(tmpdir / "gotelegram") os.environ["TELEMT_CONFIG"] = str(tmpdir / "etc" / "telemt" / "config.toml") os.environ["GOTELEGRAM_DISABLED_USERS"] = str(tmpdir / "gotelegram" / "disabled_users.json") module_name = "gotelegram_admin_server_test" sys.modules.pop(module_name, None) spec = importlib.util.spec_from_file_location(module_name, SERVER_PATH) module = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(module) return module class AdminFeatureTests(unittest.TestCase): def test_backup_path_accepts_only_local_archives(self): with tempfile.TemporaryDirectory() as raw: tmpdir = Path(raw) server = load_server(tmpdir) server.BACKUP_DIR.mkdir(parents=True) good = server.BACKUP_DIR / "gotelegram_backup_20260425_120000.tar.gz" good.write_text("backup", encoding="utf-8") encrypted = server.BACKUP_DIR / "gotelegram_backup_20260425_120001.tar.gz.enc" encrypted.write_text("backup", encoding="utf-8") legacy = server.BACKUP_DIR / "backup_20260425_120002.tar.gz" legacy.write_text("backup", encoding="utf-8") self.assertEqual(server.safe_backup_path(good.name), good.resolve()) self.assertEqual(server.safe_backup_path(encrypted.name), encrypted.resolve()) self.assertEqual(server.safe_backup_path(legacy.name), legacy.resolve()) with self.assertRaises(ValueError): server.safe_backup_path("../outside.tar.gz") with self.assertRaises(ValueError): server.safe_backup_path("gotelegram_backup_20260425_120000.tar.gz.sha256") with self.assertRaises(FileNotFoundError): server.safe_backup_path("missing.tar.gz") def test_backup_schedule_calendar_rejects_unknown_values(self): with tempfile.TemporaryDirectory() as raw: server = load_server(Path(raw)) self.assertEqual(server.backup_schedule_calendar("daily"), "*-*-* 03:20:00") self.assertEqual(server.backup_schedule_calendar("weekly"), "Sun 03:20:00") self.assertEqual(server.backup_schedule_calendar("monthly"), "*-*-01 03:20:00") self.assertIsNone(server.backup_schedule_calendar("off")) with self.assertRaises(ValueError): server.backup_schedule_calendar("hourly") def test_user_records_include_active_disabled_and_ip_limits(self): with tempfile.TemporaryDirectory() as raw: tmpdir = Path(raw) server = load_server(tmpdir) server.TELEMT_CONFIG.parent.mkdir(parents=True) server.TELEMT_CONFIG.write_text( "\n".join([ "[access.users]", 'main = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"', 'client = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"', "", "[access.user_max_unique_ips]", "main = 0", "client = 2", "disabled = 1", "", ]), encoding="utf-8", ) server.DISABLED_USERS_FILE.parent.mkdir(parents=True) server.DISABLED_USERS_FILE.write_text( json.dumps({"users": {"disabled": "cccccccccccccccccccccccccccccccc"}}), encoding="utf-8", ) records = server.read_user_records() self.assertTrue(records["client"]["enabled"]) self.assertEqual(records["client"]["max_unique_ips"], 2) self.assertFalse(records["disabled"]["enabled"]) self.assertEqual(records["disabled"]["max_unique_ips"], 1) self.assertEqual(records["main"]["max_unique_ips"], 0) def test_write_user_max_unique_ips_preserves_other_toml_sections(self): with tempfile.TemporaryDirectory() as raw: server = load_server(Path(raw)) server.TELEMT_CONFIG.parent.mkdir(parents=True) server.TELEMT_CONFIG.write_text( "\n".join([ "[server]", "port = 443", "", "[access.users]", 'main = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"', 'client = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"', "", "[access.user_max_unique_ips]", "client = 3", "old = 4", "", "[network]", 'dns_overrides = ["example.com:8443:127.0.0.1"]', "", ]), encoding="utf-8", ) server.write_user_max_unique_ips({"main": 1, "client": 0, "new": 5}) text = server.TELEMT_CONFIG.read_text(encoding="utf-8") self.assertIn("[server]\nport = 443", text) self.assertIn("[network]\ndns_overrides", text) self.assertIn("[access.user_max_unique_ips]", text) self.assertIn('"main" = 1', text) self.assertIn('"new" = 5', text) self.assertNotIn("client = 3", text) self.assertNotIn("old = 4", text) def test_keys_table_keeps_actions_inside_table_cell_wrapper(self): app_js = (ROOT / "admin-web" / "static" / "app.js").read_text(encoding="utf-8") styles = (ROOT / "admin-web" / "static" / "styles.css").read_text(encoding="utf-8") index = (ROOT / "admin-web" / "static" / "index.html").read_text(encoding="utf-8") self.assertNotIn('class="actions"', app_js) self.assertIn('class="action-buttons"', app_js) self.assertIn('class="key-name-button"', app_js) self.assertNotIn("td.actions", styles) self.assertRegex(index, r'[\s\S]*?') if __name__ == "__main__": unittest.main()