from __future__ import annotations import sys import re import contextlib from urllib.parse import urlencode from typing import Any, Generator, Optional, Callable from contextlib import contextmanager try: import xbmc import xbmcaddon import xbmcgui import xbmcplugin except ImportError: xbmc = None xbmcaddon = None xbmcgui = None xbmcplugin = None _ADDON_INSTANCE = None def get_addon(): global _ADDON_INSTANCE if xbmcaddon is None: return None if _ADDON_INSTANCE is None: _ADDON_INSTANCE = xbmcaddon.Addon() return _ADDON_INSTANCE def get_handle() -> int: return int(sys.argv[1]) if len(sys.argv) > 1 else -1 def get_setting_string(setting_id: str) -> str: addon = get_addon() if addon is None: return "" getter = getattr(addon, "getSettingString", None) if callable(getter): try: return str(getter(setting_id) or "") except Exception: pass getter = getattr(addon, "getSetting", None) if callable(getter): try: return str(getter(setting_id) or "") except Exception: pass return "" def get_setting_bool(setting_id: str, *, default: bool = False) -> bool: addon = get_addon() if addon is None: return default # Schritt 1: Prüfe ob das Setting überhaupt gesetzt ist (leerer Rohwert = default) raw_getter = getattr(addon, "getSetting", None) if callable(raw_getter): try: raw = str(raw_getter(setting_id) or "").strip() if not raw: return default except Exception: return default # Schritt 2: Bevorzuge getSettingBool für korrekte Typ-Konvertierung getter = getattr(addon, "getSettingBool", None) if callable(getter): try: return bool(getter(setting_id)) except Exception: pass # Schritt 3: Fallback – Rohwert manuell parsen if callable(raw_getter): try: raw = str(raw_getter(setting_id) or "").strip().lower() return raw == "true" except Exception: pass return default def get_setting_int(setting_id: str, *, default: int = 0) -> int: addon = get_addon() if addon is None: return default getter = getattr(addon, "getSettingInt", None) if callable(getter): try: raw_getter = getattr(addon, "getSetting", None) if callable(raw_getter): raw = str(raw_getter(setting_id) or "").strip() if not raw: return default return int(getter(setting_id)) except Exception: pass getter = getattr(addon, "getSetting", None) if callable(getter): try: raw = str(getter(setting_id) or "").strip() return int(raw) if raw else default except Exception: pass return default def set_setting_string(setting_id: str, value: str) -> None: addon = get_addon() if addon is None: return setter = getattr(addon, "setSettingString", None) if callable(setter): try: setter(setting_id, str(value)) return except Exception: pass setter = getattr(addon, "setSetting", None) if callable(setter): try: setter(setting_id, str(value)) except Exception: pass @contextmanager def progress_dialog(heading: str, message: str = ""): """Zeigt einen Fortschrittsdialog in Kodi und liefert eine Update-Funktion.""" dialog = None try: if xbmcgui is not None and hasattr(xbmcgui, "DialogProgress"): dialog = xbmcgui.DialogProgress() dialog.create(heading, message) except Exception: dialog = None def _update_fn(percent: int, msg: str = "") -> bool: if dialog: try: dialog.update(percent, msg or message) return dialog.iscanceled() except Exception: pass return False try: yield _update_fn finally: if dialog: try: dialog.close() except Exception: pass @contextmanager def busy_dialog(message: str = "Bitte warten...", *, heading: str = "Bitte warten"): """Progress-Dialog statt Spinner, mit kurzem Status-Text.""" with progress_dialog(heading, message) as progress: progress(10, message) def _update(step_message: str, percent: int | None = None) -> bool: pct = 50 if percent is None else max(5, min(95, int(percent))) return progress(pct, step_message or message) try: yield _update finally: progress(100, "Fertig") def run_with_progress(heading: str, message: str, loader: Callable[[], Any]) -> Any: """Fuehrt eine Ladefunktion mit sichtbarem Fortschrittsdialog aus.""" with progress_dialog(heading, message) as progress: progress(10, message) result = loader() progress(100, "Fertig") return result def set_content(handle: int, content: str) -> None: """Hint Kodi about the content type so skins can show watched/resume overlays.""" content = (content or "").strip() if not content: return try: setter = getattr(xbmcplugin, "setContent", None) if callable(setter): setter(handle, content) except Exception: pass def add_directory_item( handle: int, label: str, action: str, params: dict[str, str] | None = None, *, is_folder: bool = True, info_labels: dict[str, Any] | None = None, art: dict[str, str] | None = None, cast: Any = None, base_url: str = "", ) -> None: """Fuegt einen Eintrag in die Kodi-Liste ein.""" query: dict[str, str] = {"action": action} if params: query.update(params) url = f"{base_url}?{urlencode(query)}" item = xbmcgui.ListItem(label=label) if not is_folder: try: item.setProperty("IsPlayable", "true") except Exception: pass apply_video_info(item, info_labels, cast) if art: setter = getattr(item, "setArt", None) if callable(setter): try: setter(art) except Exception: pass xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder) def apply_video_info(item, info_labels: dict[str, Any] | None, cast: Any = None) -> None: """Setzt Metadaten via InfoTagVideo (Kodi v20+), mit Fallback.""" if not info_labels and not cast: return info_labels = dict(info_labels or {}) get_tag = getattr(item, "getVideoInfoTag", None) tag = None if callable(get_tag): try: tag = get_tag() except Exception: tag = None if tag is not None: try: _apply_tag_info(tag, info_labels) if cast: _apply_tag_cast(tag, cast) except Exception: pass else: # Fallback für ältere Kodi-Versionen setter = getattr(item, "setInfo", None) if callable(setter): try: setter("video", info_labels) except Exception: pass if cast: setter = getattr(item, "setCast", None) if callable(setter): try: setter(cast) except Exception: pass def _apply_tag_info(tag, info: dict[str, Any]) -> None: for key, method in [ ("title", "setTitle"), ("plot", "setPlot"), ("mediatype", "setMediaType"), ("tvshowtitle", "setTvShowTitle"), ]: val = info.get(key) if val: setter = getattr(tag, method, None) if callable(setter): setter(str(val)) for key, method in [("season", "setSeason"), ("episode", "setEpisode")]: val = info.get(key) if val not in (None, "", 0, "0"): setter = getattr(tag, method, None) if callable(setter): setter(int(val)) rating = info.get("rating") if rating not in (None, "", 0, "0"): set_rating = getattr(tag, "setRating", None) if callable(set_rating): try: set_rating(float(rating)) except Exception: pass def _apply_tag_cast(tag, cast) -> None: setter = getattr(tag, "setCast", None) if not callable(setter): return try: formatted_cast = [] for c in cast: # Erwarte TmdbCastMember oder ähnliches Objekt/Dict name = getattr(c, "name", "") or c.get("name", "") if hasattr(c, "get") else "" role = getattr(c, "role", "") or c.get("role", "") if hasattr(c, "get") else "" thumb = getattr(c, "thumbnail", "") or c.get("thumbnail", "") if hasattr(c, "get") else "" if name: formatted_cast.append(xbmcgui.Actor(name=name, role=role, thumbnail=thumb)) if formatted_cast: setter(formatted_cast) except Exception: pass def label_with_duration(label: str, info_labels: dict[str, Any]) -> str: duration = info_labels.get("duration") if not duration: return label try: minutes = int(duration) // 60 if minutes > 0: return f"{label} ({minutes} Min.)" except Exception: pass return label def extract_first_int(value: str | int | None) -> Optional[int]: if value is None: return None if isinstance(value, int): return value match = re.search(r"\d+", str(value)) return int(match.group()) if match else None def looks_like_unresolved_hoster_link(url: str) -> bool: url = (url or "").strip() return any(p in url.casefold() for p in ["hoster", "link", "resolve"]) def is_resolveurl_missing_error(err: str | None) -> bool: err = str(err or "").strip().lower() return "resolveurl" in err and ("missing" in err or "not found" in err) def is_cloudflare_challenge_error(err: str | None) -> bool: err = str(err or "").strip().lower() return "cloudflare" in err or "challenge" in err def resolveurl_last_error() -> str: try: from resolveurl_backend import get_last_error # type: ignore except Exception: return "" try: return str(get_last_error() or "") except Exception: return ""