Files
ViewIT/addon/plugin_helpers.py

279 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""Shared helpers for ViewIt plugins.
Focus:
- Kodi addon settings access (string/bool)
- Optional URL notifications
- Optional URL logging
- Optional HTML response dumps
Designed to work both in Kodi and outside Kodi (for linting/tests).
"""
from __future__ import annotations
from datetime import datetime
import hashlib
import os
import re
from typing import Optional
from urllib.parse import parse_qsl, urlencode
try: # pragma: no cover - Kodi runtime
import xbmcaddon # type: ignore[import-not-found]
import xbmcvfs # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow importing outside Kodi
xbmcaddon = None
xbmcvfs = None
xbmcgui = None
def get_setting_string(addon_id: str, setting_id: str, *, default: str = "") -> str:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(addon_id)
getter = getattr(addon, "getSettingString", None)
if getter is not None:
return str(getter(setting_id) or "").strip()
return str(addon.getSetting(setting_id) or "").strip()
except Exception:
return default
def get_setting_bool(addon_id: str, setting_id: str, *, default: bool = False) -> bool:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(addon_id)
getter = getattr(addon, "getSettingBool", None)
if getter is not None:
return bool(getter(setting_id))
raw = addon.getSetting(setting_id)
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
except Exception:
return default
def get_setting_int(addon_id: str, setting_id: str, *, default: int = 0) -> int:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(addon_id)
getter = getattr(addon, "getSettingInt", None)
if getter is not None:
return int(getter(setting_id))
raw = addon.getSetting(setting_id)
return int(str(raw).strip())
except Exception:
return default
def _is_logging_enabled(addon_id: str, *, global_setting_id: str, plugin_setting_id: Optional[str]) -> bool:
if not get_setting_bool(addon_id, global_setting_id, default=False):
return False
if plugin_setting_id:
return get_setting_bool(addon_id, plugin_setting_id, default=False)
return True
def notify_url(
addon_id: str,
*,
heading: str,
url: str,
enabled_setting_id: str,
plugin_setting_id: Optional[str] = None,
) -> None:
if xbmcgui is None:
return
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
return
try:
xbmcgui.Dialog().notification(heading, url, xbmcgui.NOTIFICATION_INFO, 3000)
except Exception:
return
def _profile_logs_dir(addon_id: str) -> Optional[str]:
if xbmcaddon is None or xbmcvfs is None:
return None
try:
addon = xbmcaddon.Addon(addon_id)
profile = xbmcvfs.translatePath(addon.getAddonInfo("profile"))
log_dir = os.path.join(profile, "logs")
if not xbmcvfs.exists(log_dir):
xbmcvfs.mkdirs(log_dir)
return log_dir
except Exception:
return None
def _append_text_file(path: str, content: str) -> None:
try:
with open(path, "a", encoding="utf-8") as handle:
handle.write(content)
return
except Exception:
pass
if xbmcvfs is None:
return
try:
handle = xbmcvfs.File(path, "a")
handle.write(content)
handle.close()
except Exception:
return
def _rotate_log_file(path: str, *, max_bytes: int, max_files: int) -> None:
if max_bytes <= 0 or max_files <= 0:
return
try:
if not os.path.exists(path) or os.path.getsize(path) <= max_bytes:
return
except Exception:
return
try:
for index in range(max_files - 1, 0, -1):
older = f"{path}.{index}"
newer = f"{path}.{index + 1}"
if os.path.exists(older):
if index + 1 > max_files:
os.remove(older)
else:
os.replace(older, newer)
os.replace(path, f"{path}.1")
except Exception:
return
def _prune_dump_files(directory: str, *, prefix: str, max_files: int) -> None:
if not directory or max_files <= 0:
return
try:
entries = [
os.path.join(directory, name)
for name in os.listdir(directory)
if name.startswith(prefix) and name.endswith(".html")
]
if len(entries) <= max_files:
return
entries.sort(key=lambda path: os.path.getmtime(path))
for path in entries[: len(entries) - max_files]:
try:
os.remove(path)
except Exception:
pass
except Exception:
return
def log_url(
addon_id: str,
*,
enabled_setting_id: str,
log_filename: str,
url: str,
kind: str = "VISIT",
request_id: Optional[str] = None,
plugin_setting_id: Optional[str] = None,
max_mb_setting_id: str = "log_max_mb",
max_files_setting_id: str = "log_max_files",
) -> None:
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
return
timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z"
request_part = f"\t{request_id}" if request_id else ""
line = f"{timestamp}\t{kind}{request_part}\t{url}\n"
log_dir = _profile_logs_dir(addon_id)
path = os.path.join(log_dir, log_filename) if log_dir else os.path.join(os.path.dirname(__file__), log_filename)
max_mb = get_setting_int(addon_id, max_mb_setting_id, default=5)
max_files = get_setting_int(addon_id, max_files_setting_id, default=3)
_rotate_log_file(path, max_bytes=max_mb * 1024 * 1024, max_files=max_files)
_append_text_file(path, line)
def log_error(
addon_id: str,
*,
enabled_setting_id: str,
log_filename: str,
message: str,
request_id: Optional[str] = None,
plugin_setting_id: Optional[str] = None,
) -> None:
log_url(
addon_id,
enabled_setting_id=enabled_setting_id,
plugin_setting_id=plugin_setting_id,
log_filename=log_filename,
url=message,
kind="ERROR",
request_id=request_id,
)
def dump_response_html(
addon_id: str,
*,
enabled_setting_id: str,
url: str,
body: str,
filename_prefix: str,
request_id: Optional[str] = None,
plugin_setting_id: Optional[str] = None,
max_files_setting_id: str = "dump_max_files",
) -> None:
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
return
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
digest = hashlib.md5(url.encode("utf-8")).hexdigest() # nosec - filename only
filename = f"{filename_prefix}_{timestamp}_{digest}.html"
log_dir = _profile_logs_dir(addon_id)
path = os.path.join(log_dir, filename) if log_dir else os.path.join(os.path.dirname(__file__), filename)
request_line = f" request_id={request_id}" if request_id else ""
content = f"<!-- {url}{request_line} -->\n{body or ''}"
if log_dir:
max_files = get_setting_int(addon_id, max_files_setting_id, default=200)
_prune_dump_files(log_dir, prefix=filename_prefix, max_files=max_files)
_append_text_file(path, content)
def normalize_resolved_stream_url(final_url: str, *, source_url: str = "") -> str:
"""Normalisiert hoster-spezifische Header im finalen Stream-Link.
`final_url` kann ein Kodi-Header-Suffix enthalten: `url|Key=Value&...`.
Die Funktion passt nur bekannte Problemfaelle an und laesst sonst alles unveraendert.
"""
url = (final_url or "").strip()
if not url:
return ""
normalized = _normalize_supervideo_serversicuro(url, source_url=source_url)
return normalized
def _normalize_supervideo_serversicuro(final_url: str, *, source_url: str = "") -> str:
if "serversicuro.cc/hls/" not in final_url.casefold() or "|" not in final_url:
return final_url
source = (source_url or "").strip()
code_match = re.search(
r"supervideo\.(?:tv|cc)/(?:e/)?([a-z0-9]+)(?:\\.html)?",
source,
flags=re.IGNORECASE,
)
if not code_match:
return final_url
code = (code_match.group(1) or "").strip()
if not code:
return final_url
media_url, header_suffix = final_url.split("|", 1)
headers = dict(parse_qsl(header_suffix, keep_blank_values=True))
headers["Referer"] = f"https://supervideo.cc/e/{code}"
return f"{media_url}|{urlencode(headers)}"