mirror of
https://github.com/anten-ka/gotelegram_pro.git
synced 2026-05-19 14:36:05 +00:00
v2.5.0: harden telemt user reloads
This commit is contained in:
@@ -243,6 +243,49 @@ class AdminFeatureTests(unittest.TestCase):
|
||||
self.assertEqual(result.returncode, 0, result.stderr)
|
||||
self.assertEqual(result.stdout.strip(), "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
||||
|
||||
def test_telemt_users_block_detects_quoted_main_user(self):
|
||||
script = "\n".join([
|
||||
"set -e",
|
||||
f"source {shlex.quote(str(ROOT / 'lib' / 'telemt_config.sh'))}",
|
||||
"block=$'\"main\" = \"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\"\\n\"client\" = \"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb\"'",
|
||||
"telemt_users_block_has_main \"$block\"",
|
||||
])
|
||||
|
||||
result = subprocess.run(
|
||||
["bash", "-lc", script],
|
||||
cwd=ROOT,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
self.assertEqual(result.returncode, 0, result.stderr)
|
||||
|
||||
def test_telemt_restart_requests_are_debounced(self):
|
||||
with tempfile.TemporaryDirectory() as raw:
|
||||
server = load_server(Path(raw))
|
||||
calls = []
|
||||
original_run = server.run
|
||||
original_status = server.service_status
|
||||
try:
|
||||
server._LAST_TELEMT_RESTART = 0.0
|
||||
server.TELEMT_RESTART_DEBOUNCE_SECONDS = 30.0
|
||||
server.service_status = lambda name: "running"
|
||||
|
||||
def fake_run(cmd, timeout=8):
|
||||
calls.append(cmd)
|
||||
return 0, "", ""
|
||||
|
||||
server.run = fake_run
|
||||
self.assertTrue(server.request_service_restart("telemt"))
|
||||
self.assertTrue(server.request_service_restart("telemt"))
|
||||
finally:
|
||||
server.run = original_run
|
||||
server.service_status = original_status
|
||||
|
||||
restart_calls = [cmd for cmd in calls if cmd[:3] == ["systemctl", "--no-block", "restart"]]
|
||||
self.assertEqual(len(restart_calls), 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user