#!/usr/bin/env python3 """Shared helpers for ViewIt plugins. Focus: - Kodi addon settings access (string/bool) - Optional URL notifications - Optional URL logging - Optional HTML response dumps Designed to work both in Kodi and outside Kodi (for linting/tests). """ from __future__ import annotations from datetime import datetime import hashlib import os import re from typing import Optional from urllib.parse import parse_qsl, urlencode try: # pragma: no cover - Kodi runtime import xbmcaddon # type: ignore[import-not-found] import xbmcvfs # type: ignore[import-not-found] import xbmcgui # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow importing outside Kodi xbmcaddon = None xbmcvfs = None xbmcgui = None def get_setting_string(addon_id: str, setting_id: str, *, default: str = "") -> str: if xbmcaddon is None: return default try: addon = xbmcaddon.Addon(addon_id) getter = getattr(addon, "getSettingString", None) if getter is not None: return str(getter(setting_id) or "").strip() return str(addon.getSetting(setting_id) or "").strip() except Exception: return default def get_setting_bool(addon_id: str, setting_id: str, *, default: bool = False) -> bool: if xbmcaddon is None: return default try: addon = xbmcaddon.Addon(addon_id) getter = getattr(addon, "getSettingBool", None) if getter is not None: return bool(getter(setting_id)) raw = addon.getSetting(setting_id) return str(raw).strip().lower() in {"1", "true", "yes", "on"} except Exception: return default def get_setting_int(addon_id: str, setting_id: str, *, default: int = 0) -> int: if xbmcaddon is None: return default try: addon = xbmcaddon.Addon(addon_id) getter = getattr(addon, "getSettingInt", None) if getter is not None: return int(getter(setting_id)) raw = addon.getSetting(setting_id) return int(str(raw).strip()) except Exception: return default def _is_logging_enabled(addon_id: str, *, global_setting_id: str, plugin_setting_id: Optional[str]) -> bool: if not get_setting_bool(addon_id, global_setting_id, default=False): return False if plugin_setting_id: return get_setting_bool(addon_id, plugin_setting_id, default=False) return True def notify_url( addon_id: str, *, heading: str, url: str, enabled_setting_id: str, plugin_setting_id: Optional[str] = None, ) -> None: if xbmcgui is None: return if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id): return try: xbmcgui.Dialog().notification(heading, url, xbmcgui.NOTIFICATION_INFO, 3000) except Exception: return def show_notification( heading: str, message: str, *, icon: int | None = None, milliseconds: int = 3000, ) -> None: """Zeigt eine kurze Kodi-Notification an (falls `xbmcgui` verfuegbar ist).""" if xbmcgui is None: return try: icon_value = icon if icon is not None else xbmcgui.NOTIFICATION_INFO xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), icon_value, int(milliseconds)) except Exception: return def show_error(heading: str, message: str, *, milliseconds: int = 4000) -> None: """Zeigt eine einheitliche Fehlermeldung im Kodi-UI.""" if xbmcgui is None: return try: xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), xbmcgui.NOTIFICATION_ERROR, int(milliseconds)) except Exception: return def _profile_logs_dir(addon_id: str) -> Optional[str]: if xbmcaddon is None or xbmcvfs is None: return None try: addon = xbmcaddon.Addon(addon_id) profile = xbmcvfs.translatePath(addon.getAddonInfo("profile")) log_dir = os.path.join(profile, "logs") if not xbmcvfs.exists(log_dir): xbmcvfs.mkdirs(log_dir) return log_dir except Exception: return None def _append_text_file(path: str, content: str) -> None: try: with open(path, "a", encoding="utf-8") as handle: handle.write(content) return except Exception: pass if xbmcvfs is None: return try: handle = xbmcvfs.File(path, "a") handle.write(content) handle.close() except Exception: return def _rotate_log_file(path: str, *, max_bytes: int, max_files: int) -> None: if max_bytes <= 0 or max_files <= 0: return try: if not os.path.exists(path) or os.path.getsize(path) <= max_bytes: return except Exception: return try: for index in range(max_files - 1, 0, -1): older = f"{path}.{index}" newer = f"{path}.{index + 1}" if os.path.exists(older): if index + 1 > max_files: os.remove(older) else: os.replace(older, newer) os.replace(path, f"{path}.1") except Exception: return def _prune_dump_files(directory: str, *, prefix: str, max_files: int) -> None: if not directory or max_files <= 0: return try: entries = [ os.path.join(directory, name) for name in os.listdir(directory) if name.startswith(prefix) and name.endswith(".html") ] if len(entries) <= max_files: return entries.sort(key=lambda path: os.path.getmtime(path)) for path in entries[: len(entries) - max_files]: try: os.remove(path) except Exception: pass except Exception: return def log_url( addon_id: str, *, enabled_setting_id: str, log_filename: str, url: str, kind: str = "VISIT", request_id: Optional[str] = None, plugin_setting_id: Optional[str] = None, max_mb_setting_id: str = "log_max_mb", max_files_setting_id: str = "log_max_files", ) -> None: if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id): return timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z" request_part = f"\t{request_id}" if request_id else "" line = f"{timestamp}\t{kind}{request_part}\t{url}\n" log_dir = _profile_logs_dir(addon_id) path = os.path.join(log_dir, log_filename) if log_dir else os.path.join(os.path.dirname(__file__), log_filename) max_mb = get_setting_int(addon_id, max_mb_setting_id, default=5) max_files = get_setting_int(addon_id, max_files_setting_id, default=3) _rotate_log_file(path, max_bytes=max_mb * 1024 * 1024, max_files=max_files) _append_text_file(path, line) def log_error( addon_id: str, *, enabled_setting_id: str, log_filename: str, message: str, request_id: Optional[str] = None, plugin_setting_id: Optional[str] = None, ) -> None: log_url( addon_id, enabled_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id, log_filename=log_filename, url=message, kind="ERROR", request_id=request_id, ) def dump_response_html( addon_id: str, *, enabled_setting_id: str, url: str, body: str, filename_prefix: str, request_id: Optional[str] = None, plugin_setting_id: Optional[str] = None, max_files_setting_id: str = "dump_max_files", ) -> None: if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id): return timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") digest = hashlib.md5(url.encode("utf-8")).hexdigest() # nosec - filename only filename = f"{filename_prefix}_{timestamp}_{digest}.html" log_dir = _profile_logs_dir(addon_id) path = os.path.join(log_dir, filename) if log_dir else os.path.join(os.path.dirname(__file__), filename) request_line = f" request_id={request_id}" if request_id else "" content = f"\n{body or ''}" if log_dir: max_files = get_setting_int(addon_id, max_files_setting_id, default=200) _prune_dump_files(log_dir, prefix=filename_prefix, max_files=max_files) _append_text_file(path, content) def resolve_via_resolveurl(link: str, *, fallback_to_link: bool = True) -> Optional[str]: """Versucht einen Hoster-Link mit resolveurl_backend aufzuloesen. Gibt den aufgeloesten Link zurueck, oder – wenn resolveurl nicht verfuegbar ist oder nichts liefert – den Original-Link (wenn fallback_to_link=True) bzw. None (wenn fallback_to_link=False). """ link = (link or "").strip() if not link: return None try: from resolveurl_backend import resolve as _resolve_fn # type: ignore[import-not-found] except Exception: _resolve_fn = None if callable(_resolve_fn): resolved = _resolve_fn(link) if resolved: return resolved return link if fallback_to_link else None def normalize_resolved_stream_url(final_url: str, *, source_url: str = "") -> str: """Normalisiert hoster-spezifische Header im finalen Stream-Link. `final_url` kann ein Kodi-Header-Suffix enthalten: `url|Key=Value&...`. Die Funktion passt nur bekannte Problemfaelle an und laesst sonst alles unveraendert. """ url = (final_url or "").strip() if not url: return "" normalized = _normalize_supervideo_serversicuro(url, source_url=source_url) return normalized def _normalize_supervideo_serversicuro(final_url: str, *, source_url: str = "") -> str: if "serversicuro.cc/hls/" not in final_url.casefold() or "|" not in final_url: return final_url source = (source_url or "").strip() code_match = re.search( r"supervideo\.(?:tv|cc)/(?:e/)?([a-z0-9]+)(?:\\.html)?", source, flags=re.IGNORECASE, ) if not code_match: return final_url code = (code_match.group(1) or "").strip() if not code: return final_url media_url, header_suffix = final_url.split("|", 1) headers = dict(parse_qsl(header_suffix, keep_blank_values=True)) headers["Referer"] = f"https://supervideo.cc/e/{code}" return f"{media_url}|{urlencode(headers)}"