v2.5.0: show routed services behind port 443

This commit is contained in:
Виталий Литвинов
2026-04-25 13:25:32 +03:00
parent 5225811b3c
commit 7eaeef8b49
3 changed files with 106 additions and 13 deletions

View File

@@ -376,7 +376,7 @@ def parse_ss_listeners(output: str, proto: str, port: int = 443) -> list[dict[st
return listeners
def port_443_status() -> dict[str, Any]:
def collect_port_listeners(port: int) -> tuple[list[dict[str, Any]], list[str]]:
listeners: list[dict[str, Any]] = []
errors: list[str] = []
for proto, args in {
@@ -385,14 +385,75 @@ def port_443_status() -> dict[str, Any]:
}.items():
code, stdout, stderr = run(args, timeout=2)
if code == 0:
listeners.extend(parse_ss_listeners(stdout, proto, 443))
listeners.extend(parse_ss_listeners(stdout, proto, port))
elif stderr.strip():
errors.append(stderr.strip())
listeners.sort(key=lambda item: (item["proto"], item["address"], item["process"]))
return listeners, errors
def read_telemt_edge_settings() -> dict[str, Any]:
settings: dict[str, Any] = {"tls_domain": "", "mask_port": 0, "dns_overrides": []}
if not TELEMT_CONFIG.exists():
return settings
section = ""
for raw in TELEMT_CONFIG.read_text(encoding="utf-8", errors="ignore").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
section = line.strip("[]")
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().split("#", 1)[0].strip()
if section == "censorship" and key == "tls_domain":
settings["tls_domain"] = value.strip('"').strip("'")
elif section == "censorship" and key == "mask_port":
try:
settings["mask_port"] = int(value)
except ValueError:
settings["mask_port"] = 0
elif section == "network" and key == "dns_overrides":
settings["dns_overrides"] = re.findall(r'"([^"]+)"', value)
return settings
def routed_behind_443() -> list[dict[str, Any]]:
config = load_json(GOTELEGRAM_CONFIG, {}) or {}
mode = str(config.get("mode") or "")
domain = str(config.get("domain") or "")
settings = read_telemt_edge_settings()
mask_port = int(settings.get("mask_port") or 0)
tls_domain = str(settings.get("tls_domain") or domain)
routes: list[dict[str, Any]] = []
if mode == "pro" and domain and mask_port and mask_port != 443:
internal, _ = collect_port_listeners(mask_port)
site_listener = next((item for item in internal if item.get("role") == "site"), None)
routes.append({
"role": "site",
"proto": "HTTPS",
"public": f"{domain}:443",
"target": f"127.0.0.1:{mask_port}",
"process": (site_listener or {}).get("process") or "nginx",
"pid": (site_listener or {}).get("pid") or "",
"status": service_status("nginx"),
"via": "telemt dns_overrides",
"tls_domain": tls_domain,
"details": settings.get("dns_overrides") or [],
})
return routes
def port_443_status() -> dict[str, Any]:
listeners, errors = collect_port_listeners(443)
return {
"checked_at": int(time.time()),
"configured_port": read_telemt_port(),
"listeners": listeners,
"routes": routed_behind_443(),
"ok": not errors,
"error": "; ".join(errors[:2]),
}