#!/usr/bin/env python3 """ViewIt Kodi-Addon Einstiegspunkt. Dieses Modul ist der Router fuer die Kodi-Navigation: es rendert Menues, ruft Plugin-Implementierungen auf und startet die Wiedergabe. """ from __future__ import annotations import asyncio import atexit from contextlib import contextmanager from datetime import datetime import importlib.util import inspect import json import io import os import re import sys import threading import time import xml.etree.ElementTree as ET import zipfile from pathlib import Path from types import ModuleType from urllib.parse import parse_qs, urlencode, urlparse from urllib.error import URLError from urllib.request import Request, urlopen def _ensure_windows_selector_policy() -> None: """Erzwingt unter Windows einen Selector-Loop (thread-kompatibel in Kodi).""" if not sys.platform.startswith("win"): return try: current = asyncio.get_event_loop_policy() if current.__class__.__name__ == "WindowsSelectorEventLoopPolicy": return asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) except Exception: # Fallback: Wenn die Policy nicht verfügbar ist, arbeitet der Code mit Default-Policy weiter. return try: # pragma: no cover - Kodi runtime import xbmc # type: ignore[import-not-found] import xbmcaddon # type: ignore[import-not-found] import xbmcgui # type: ignore[import-not-found] import xbmcplugin # type: ignore[import-not-found] import xbmcvfs # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow importing outside Kodi (e.g. linting) xbmc = None xbmcaddon = None xbmcgui = None xbmcplugin = None xbmcvfs = None class _XbmcStub: LOGDEBUG = 0 LOGINFO = 1 LOGWARNING = 2 @staticmethod def log(message: str, level: int = 1) -> None: print(f"[KodiStub:{level}] {message}") class Player: def play(self, item: str, listitem: object | None = None) -> None: print(f"[KodiStub] play: {item}") class _XbmcGuiStub: INPUT_ALPHANUM = 0 NOTIFICATION_INFO = 0 class Dialog: def input(self, heading: str, type: int = 0) -> str: raise RuntimeError("xbmcgui ist nicht verfuegbar (KodiStub).") def select(self, heading: str, options: list[str]) -> int: raise RuntimeError("xbmcgui ist nicht verfuegbar (KodiStub).") def notification(self, heading: str, message: str, icon: int = 0, time: int = 0) -> None: print(f"[KodiStub] notification: {heading}: {message}") class ListItem: def __init__(self, label: str = "", path: str = "") -> None: self._label = label self._path = path def setInfo(self, type: str, infoLabels: dict[str, str]) -> None: return class _XbmcPluginStub: @staticmethod def addDirectoryItem(*, handle: int, url: str, listitem: object, isFolder: bool) -> None: print(f"[KodiStub] addDirectoryItem: {url}") @staticmethod def endOfDirectory(handle: int) -> None: print(f"[KodiStub] endOfDirectory: {handle}") @staticmethod def setPluginCategory(handle: int, category: str) -> None: print(f"[KodiStub] category: {category}") xbmc = _XbmcStub() xbmcgui = _XbmcGuiStub() xbmcplugin = _XbmcPluginStub() from plugin_interface import BasisPlugin from http_session_pool import close_all_sessions from plugin_helpers import normalize_resolved_stream_url from metadata_utils import ( collect_plugin_metadata as _collect_plugin_metadata, merge_metadata as _merge_metadata, metadata_policy as _metadata_policy_impl, needs_tmdb as _needs_tmdb, ) from tmdb import TmdbCastMember, fetch_tv_episode_credits, lookup_movie, lookup_tv_season, lookup_tv_season_summary, lookup_tv_show PLUGIN_DIR = Path(__file__).with_name("plugins") _PLUGIN_CACHE: dict[str, BasisPlugin] | None = None _TMDB_CACHE: dict[str, tuple[dict[str, str], dict[str, str]]] = {} _TMDB_CAST_CACHE: dict[str, list[TmdbCastMember]] = {} _TMDB_ID_CACHE: dict[str, int] = {} _TMDB_SEASON_CACHE: dict[tuple[int, int, str, str], dict[int, tuple[dict[str, str], dict[str, str]]]] = {} _TMDB_SEASON_SUMMARY_CACHE: dict[tuple[int, int, str, str], tuple[dict[str, str], dict[str, str]]] = {} _TMDB_EPISODE_CAST_CACHE: dict[tuple[int, int, int, str], list[TmdbCastMember]] = {} _TMDB_LOG_PATH: str | None = None _GENRE_TITLES_CACHE: dict[tuple[str, str], list[str]] = {} _ADDON_INSTANCE = None _PLAYSTATE_CACHE: dict[str, dict[str, object]] | None = None _PLAYSTATE_LOCK = threading.RLock() _TMDB_LOCK = threading.RLock() WATCHED_THRESHOLD = 0.9 POPULAR_MENU_LABEL = "Haeufig gesehen" LATEST_MENU_LABEL = "Neuste Titel" LIST_PAGE_SIZE = 20 atexit.register(close_all_sessions) def _tmdb_cache_get(cache: dict, key, default=None): with _TMDB_LOCK: return cache.get(key, default) def _tmdb_cache_set(cache: dict, key, value) -> None: with _TMDB_LOCK: cache[key] = value def _tmdb_prefetch_concurrency() -> int: """Max number of concurrent TMDB lookups when prefetching metadata for lists.""" try: raw = _get_setting_string("tmdb_prefetch_concurrency").strip() value = int(raw) if raw else 6 except Exception: value = 6 return max(1, min(20, value)) def _tmdb_enabled() -> bool: return _get_setting_bool("tmdb_enabled", default=True) def _log(message: str, level: int = xbmc.LOGINFO) -> None: xbmc.log(f"[ViewIt] {message}", level) def _busy_open() -> None: try: # pragma: no cover - Kodi runtime if xbmc is not None and hasattr(xbmc, "executebuiltin"): xbmc.executebuiltin("ActivateWindow(busydialognocancel)") except Exception: pass def _busy_close() -> None: try: # pragma: no cover - Kodi runtime if xbmc is not None and hasattr(xbmc, "executebuiltin"): xbmc.executebuiltin("Dialog.Close(busydialognocancel)") xbmc.executebuiltin("Dialog.Close(busydialog)") except Exception: pass @contextmanager def _busy_dialog(message: str = "Bitte warten...", *, heading: str = "Bitte warten"): """Progress-Dialog statt Spinner, mit kurzem Status-Text.""" with _progress_dialog(heading, message) as progress: progress(10, message) def _update(step_message: str, percent: int | None = None) -> bool: pct = 50 if percent is None else max(5, min(95, int(percent))) return progress(pct, step_message or message) try: yield _update finally: progress(100, "Fertig") @contextmanager def _progress_dialog(heading: str, message: str = ""): """Zeigt einen Fortschrittsdialog in Kodi und liefert eine Update-Funktion.""" dialog = None try: # pragma: no cover - Kodi runtime if xbmcgui is not None and hasattr(xbmcgui, "DialogProgress"): dialog = xbmcgui.DialogProgress() dialog.create(heading, message) except Exception: dialog = None def _update(percent: int, text: str = "") -> bool: if dialog is None: return False percent = max(0, min(100, int(percent))) try: # Kodi Matrix/Nexus dialog.update(percent, text) except TypeError: try: # Kodi Leia fallback dialog.update(percent, text, "", "") except Exception: pass except Exception: pass try: return bool(dialog.iscanceled()) except Exception: return False try: yield _update finally: if dialog is not None: try: dialog.close() except Exception: pass def _run_with_progress(heading: str, message: str, loader): """Fuehrt eine Ladefunktion mit sichtbarem Fortschrittsdialog aus.""" with _progress_dialog(heading, message) as progress: progress(10, message) result = loader() progress(100, "Fertig") return result def _method_accepts_kwarg(method: object, kwarg_name: str) -> bool: if not callable(method): return False try: signature = inspect.signature(method) except Exception: return False for param in signature.parameters.values(): if param.kind == inspect.Parameter.VAR_KEYWORD: return True if param.name == kwarg_name and param.kind in ( inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY, ): return True return False def _call_plugin_search(plugin: BasisPlugin, query: str, *, progress_callback=None): method = getattr(plugin, "search_titles", None) if not callable(method): raise RuntimeError("Plugin hat keine gueltige search_titles Methode.") if progress_callback is not None and _method_accepts_kwarg(method, "progress_callback"): return method(query, progress_callback=progress_callback) return method(query) def _get_handle() -> int: return int(sys.argv[1]) if len(sys.argv) > 1 else -1 def _set_content(handle: int, content: str) -> None: """Hint Kodi about the content type so skins can show watched/resume overlays.""" content = (content or "").strip() if not content: return try: # pragma: no cover - Kodi runtime setter = getattr(xbmcplugin, "setContent", None) if callable(setter): setter(handle, content) except Exception: pass def _get_addon(): global _ADDON_INSTANCE if xbmcaddon is None: return None if _ADDON_INSTANCE is None: _ADDON_INSTANCE = xbmcaddon.Addon() return _ADDON_INSTANCE def _playstate_key(*, plugin_name: str, title: str, season: str, episode: str) -> str: plugin_name = (plugin_name or "").strip() title = (title or "").strip() season = (season or "").strip() episode = (episode or "").strip() return f"{plugin_name}\t{title}\t{season}\t{episode}" def _load_playstate() -> dict[str, dict[str, object]]: return {} def _save_playstate(state: dict[str, dict[str, object]]) -> None: return def _get_playstate(key: str) -> dict[str, object]: return {} def _set_playstate(key: str, value: dict[str, object]) -> None: return def _apply_playstate_to_info(info_labels: dict[str, object], playstate: dict[str, object]) -> dict[str, object]: return dict(info_labels or {}) def _label_with_playstate(label: str, playstate: dict[str, object]) -> str: return label def _title_playstate(plugin_name: str, title: str) -> dict[str, object]: return _get_playstate(_playstate_key(plugin_name=plugin_name, title=title, season="", episode="")) def _season_playstate(plugin_name: str, title: str, season: str) -> dict[str, object]: return _get_playstate(_playstate_key(plugin_name=plugin_name, title=title, season=season, episode="")) def _get_setting_string(setting_id: str) -> str: if xbmcaddon is None: return "" addon = _get_addon() if addon is None: return "" getter = getattr(addon, "getSettingString", None) if callable(getter): try: return str(getter(setting_id) or "") except TypeError: return "" getter = getattr(addon, "getSetting", None) if callable(getter): try: return str(getter(setting_id) or "") except TypeError: return "" return "" def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool: if xbmcaddon is None: return default addon = _get_addon() if addon is None: return default getter = getattr(addon, "getSettingBool", None) if callable(getter): # Kodi kann für unbekannte Settings stillschweigend `False` liefern. # Damit neue Settings mit `default=True` korrekt funktionieren, prüfen wir auf leeren Raw-Value. raw_getter = getattr(addon, "getSetting", None) if callable(raw_getter): try: raw = str(raw_getter(setting_id) or "").strip() except TypeError: raw = "" if raw == "": return default try: return bool(getter(setting_id)) except TypeError: return default getter = getattr(addon, "getSetting", None) if callable(getter): try: raw = str(getter(setting_id) or "").strip().lower() except TypeError: return default if raw in {"true", "1", "yes", "on"}: return True if raw in {"false", "0", "no", "off"}: return False return default def _get_setting_int(setting_id: str, *, default: int = 0) -> int: if xbmcaddon is None: return default addon = _get_addon() if addon is None: return default getter = getattr(addon, "getSettingInt", None) if callable(getter): raw_getter = getattr(addon, "getSetting", None) if callable(raw_getter): try: raw = str(raw_getter(setting_id) or "").strip() except TypeError: raw = "" if raw == "": return default try: return int(getter(setting_id)) except TypeError: return default getter = getattr(addon, "getSetting", None) if callable(getter): try: raw = str(getter(setting_id) or "").strip() except TypeError: return default if raw == "": return default try: return int(raw) except ValueError: return default return default def _metadata_policy( plugin_name: str, plugin: BasisPlugin, *, allow_tmdb: bool, ) -> tuple[bool, bool, bool]: return _metadata_policy_impl( plugin_name, plugin, allow_tmdb=allow_tmdb, get_setting_int=_get_setting_int, ) def _tmdb_list_enabled() -> bool: return _tmdb_enabled() and _get_setting_bool("tmdb_genre_metadata", default=False) def _set_setting_string(setting_id: str, value: str) -> None: if xbmcaddon is None: return addon = _get_addon() if addon is None: return setter = getattr(addon, "setSettingString", None) if callable(setter): try: setter(setting_id, str(value)) return except TypeError: return setter = getattr(addon, "setSetting", None) if callable(setter): try: setter(setting_id, str(value)) except TypeError: return def _apply_video_info(item, info_labels: dict[str, object] | None, cast: list[TmdbCastMember] | None) -> None: """Setzt Metadaten bevorzugt via InfoTagVideo (Kodi v20+), mit Fallback auf deprecated APIs.""" if not info_labels and not cast: return info_labels = dict(info_labels or {}) get_tag = getattr(item, "getVideoInfoTag", None) tag = None if callable(get_tag): try: tag = get_tag() except Exception: tag = None if tag is not None: try: title = info_labels.get("title") or "" plot = info_labels.get("plot") or "" mediatype = info_labels.get("mediatype") or "" tvshowtitle = info_labels.get("tvshowtitle") or "" season = info_labels.get("season") episode = info_labels.get("episode") rating = info_labels.get("rating") votes = info_labels.get("votes") duration = info_labels.get("duration") playcount = info_labels.get("playcount") resume_position = info_labels.get("resume_position") resume_total = info_labels.get("resume_total") setter = getattr(tag, "setTitle", None) if callable(setter) and title: setter(str(title)) setter = getattr(tag, "setPlot", None) if callable(setter) and plot: setter(str(plot)) setter = getattr(tag, "setMediaType", None) if callable(setter) and mediatype: setter(str(mediatype)) setter = getattr(tag, "setTvShowTitle", None) if callable(setter) and tvshowtitle: setter(str(tvshowtitle)) setter = getattr(tag, "setSeason", None) if callable(setter) and season not in (None, "", 0, "0"): setter(int(season)) # type: ignore[arg-type] setter = getattr(tag, "setEpisode", None) if callable(setter) and episode not in (None, "", 0, "0"): setter(int(episode)) # type: ignore[arg-type] if rating not in (None, "", 0, "0"): try: rating_f = float(rating) # type: ignore[arg-type] except Exception: rating_f = 0.0 if rating_f: set_rating = getattr(tag, "setRating", None) if callable(set_rating): try: if votes not in (None, "", 0, "0"): set_rating(rating_f, int(votes), "tmdb") # type: ignore[misc] else: set_rating(rating_f) # type: ignore[misc] except Exception: try: set_rating(rating_f, int(votes or 0), "tmdb", True) # type: ignore[misc] except Exception: pass if duration not in (None, "", 0, "0"): try: duration_i = int(duration) # type: ignore[arg-type] except Exception: duration_i = 0 if duration_i: set_duration = getattr(tag, "setDuration", None) if callable(set_duration): try: set_duration(duration_i) except Exception: pass if playcount not in (None, "", 0, "0"): try: playcount_i = int(playcount) # type: ignore[arg-type] except Exception: playcount_i = 0 if playcount_i: set_playcount = getattr(tag, "setPlaycount", None) if callable(set_playcount): try: set_playcount(playcount_i) except Exception: pass try: pos = int(resume_position) if resume_position is not None else 0 tot = int(resume_total) if resume_total is not None else 0 except Exception: pos, tot = 0, 0 if pos > 0 and tot > 0: set_resume = getattr(tag, "setResumePoint", None) if callable(set_resume): try: set_resume(pos, tot) except Exception: try: set_resume(pos) # type: ignore[misc] except Exception: pass if cast: set_cast = getattr(tag, "setCast", None) actor_cls = getattr(xbmc, "Actor", None) if callable(set_cast) and actor_cls is not None: actors = [] for index, member in enumerate(cast[:30]): try: actors.append(actor_cls(member.name, member.role, index, member.thumb)) except Exception: try: actors.append(actor_cls(member.name, member.role)) except Exception: continue try: set_cast(actors) except Exception: pass elif callable(set_cast): cast_dicts = [ {"name": m.name, "role": m.role, "thumbnail": m.thumb} for m in cast[:30] if m.name ] try: set_cast(cast_dicts) except Exception: pass return except Exception: # Fallback below pass # Deprecated fallback for older Kodi. try: item.setInfo("video", info_labels) # type: ignore[arg-type] except Exception: pass if cast: set_cast = getattr(item, "setCast", None) if callable(set_cast): try: set_cast([m.name for m in cast[:30] if m.name]) except Exception: pass def _get_log_path(filename: str) -> str: if xbmcaddon and xbmcvfs: addon = xbmcaddon.Addon() profile = xbmcvfs.translatePath(addon.getAddonInfo("profile")) log_dir = os.path.join(profile, "logs") if not xbmcvfs.exists(log_dir): xbmcvfs.mkdirs(log_dir) return os.path.join(log_dir, filename) return os.path.join(os.path.dirname(__file__), filename) def _tmdb_file_log(message: str) -> None: global _TMDB_LOG_PATH if _TMDB_LOG_PATH is None: _TMDB_LOG_PATH = _get_log_path("tmdb.log") timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z" line = f"{timestamp}\t{message}\n" try: with open(_TMDB_LOG_PATH, "a", encoding="utf-8") as handle: handle.write(line) except Exception: if xbmcvfs is None: return try: handle = xbmcvfs.File(_TMDB_LOG_PATH, "a") handle.write(line) handle.close() except Exception: return def _tmdb_labels_and_art(title: str) -> tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]: if not _tmdb_enabled(): return {}, {}, [] title_key = (title or "").strip().casefold() language = _get_setting_string("tmdb_language").strip() or "de-DE" show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) show_fanart = _get_setting_bool("tmdb_show_fanart", default=True) show_rating = _get_setting_bool("tmdb_show_rating", default=True) show_votes = _get_setting_bool("tmdb_show_votes", default=False) show_cast = _get_setting_bool("tmdb_show_cast", default=False) flags = f"p{int(show_plot)}a{int(show_art)}f{int(show_fanart)}r{int(show_rating)}v{int(show_votes)}c{int(show_cast)}" cache_key = f"{language}|{flags}|{title_key}" cached = _tmdb_cache_get(_TMDB_CACHE, cache_key) if cached is not None: info, art = cached # Cast wird nicht in _TMDB_CACHE gehalten (weil es ListItem.setCast betrifft), daher separat cachen: cast_cached = _tmdb_cache_get(_TMDB_CAST_CACHE, cache_key, []) return info, art, list(cast_cached) info_labels: dict[str, str] = {"title": title} art: dict[str, str] = {} cast: list[TmdbCastMember] = [] query = (title or "").strip() api_key = _get_setting_string("tmdb_api_key").strip() log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) if api_key: try: log_fn = _tmdb_file_log if (log_requests or log_responses) else None # Einige Plugins liefern Titel wie "… – Der Film". Für TMDB ist oft der Basistitel besser. candidates: list[str] = [] if query: candidates.append(query) simplified = re.sub(r"\s*[-–]\s*der\s+film\s*$", "", query, flags=re.IGNORECASE).strip() if simplified and simplified not in candidates: candidates.append(simplified) meta = None is_tv = False for candidate in candidates: meta = lookup_tv_show( title=candidate, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, include_cast=show_cast, ) if meta: is_tv = True break if not meta: for candidate in candidates: movie = lookup_movie( title=candidate, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, include_cast=show_cast, ) if movie: meta = movie break except Exception as exc: try: _tmdb_file_log(f"TMDB ERROR lookup_failed title={title!r} error={exc!r}") except Exception: pass _log(f"TMDB Meta fehlgeschlagen: {exc}", xbmc.LOGDEBUG) meta = None if meta: # Nur TV-IDs cachen (für Staffel-/Episoden-Lookups); Movie-IDs würden dort fehlschlagen. if is_tv: _tmdb_cache_set(_TMDB_ID_CACHE, title_key, int(getattr(meta, "tmdb_id", 0) or 0)) info_labels.setdefault("mediatype", "tvshow") else: info_labels.setdefault("mediatype", "movie") if show_plot and getattr(meta, "plot", ""): info_labels["plot"] = getattr(meta, "plot", "") runtime_minutes = int(getattr(meta, "runtime_minutes", 0) or 0) if runtime_minutes > 0 and not is_tv: info_labels["duration"] = str(runtime_minutes * 60) rating = getattr(meta, "rating", 0.0) or 0.0 votes = getattr(meta, "votes", 0) or 0 if show_rating and rating: # Kodi akzeptiert je nach Version float oder string; wir bleiben bei strings wie im restlichen Code. info_labels["rating"] = str(rating) if show_votes and votes: info_labels["votes"] = str(votes) if show_art and getattr(meta, "poster", ""): poster = getattr(meta, "poster", "") art.update({"thumb": poster, "poster": poster, "icon": poster}) if show_fanart and getattr(meta, "fanart", ""): fanart = getattr(meta, "fanart", "") if fanart: art.update({"fanart": fanart, "landscape": fanart}) if show_cast: cast = list(getattr(meta, "cast", []) or []) elif log_requests or log_responses: _tmdb_file_log(f"TMDB MISS title={title!r}") _tmdb_cache_set(_TMDB_CACHE, cache_key, (info_labels, art)) _tmdb_cache_set(_TMDB_CAST_CACHE, cache_key, list(cast)) return info_labels, art, list(cast) async def _tmdb_labels_and_art_bulk_async( titles: list[str], ) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]]: titles = [str(t).strip() for t in (titles or []) if t and str(t).strip()] if not titles: return {} unique_titles: list[str] = list(dict.fromkeys(titles)) limit = _tmdb_prefetch_concurrency() semaphore = asyncio.Semaphore(limit) async def fetch_one(title: str): async with semaphore: return title, await asyncio.to_thread(_tmdb_labels_and_art, title) tasks = [fetch_one(title) for title in unique_titles] results = await asyncio.gather(*tasks, return_exceptions=True) mapped: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} for entry in results: if isinstance(entry, Exception): continue try: title, payload = entry except Exception: continue if isinstance(title, str) and isinstance(payload, tuple) and len(payload) == 3: mapped[title] = payload # type: ignore[assignment] return mapped def _tmdb_labels_and_art_bulk( titles: list[str], ) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]]: if not _tmdb_enabled(): return {} return _run_async(_tmdb_labels_and_art_bulk_async(titles)) def _tmdb_episode_labels_and_art(*, title: str, season_label: str, episode_label: str) -> tuple[dict[str, str], dict[str, str]]: if not _tmdb_enabled(): return {"title": episode_label}, {} title_key = (title or "").strip().casefold() tmdb_id = _tmdb_cache_get(_TMDB_ID_CACHE, title_key) if not tmdb_id: _tmdb_labels_and_art(title) tmdb_id = _tmdb_cache_get(_TMDB_ID_CACHE, title_key) if not tmdb_id: return {"title": episode_label}, {} season_number = _extract_first_int(season_label) episode_number = _extract_first_int(episode_label) if season_number is None or episode_number is None: return {"title": episode_label}, {} language = _get_setting_string("tmdb_language").strip() or "de-DE" show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) flags = f"p{int(show_plot)}a{int(show_art)}" season_key = (tmdb_id, season_number, language, flags) cached_season = _tmdb_cache_get(_TMDB_SEASON_CACHE, season_key) if cached_season is None: api_key = _get_setting_string("tmdb_api_key").strip() if not api_key: return {"title": episode_label}, {} log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) log_fn = _tmdb_file_log if (log_requests or log_responses) else None try: season_meta = lookup_tv_season( tmdb_id=tmdb_id, season_number=season_number, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, ) except Exception as exc: if log_fn: log_fn(f"TMDB ERROR season_lookup_failed tmdb_id={tmdb_id} season={season_number} error={exc!r}") season_meta = None mapped: dict[int, tuple[dict[str, str], dict[str, str]]] = {} if season_meta: for ep_no, ep in season_meta.items(): info: dict[str, str] = {"title": f"Episode {ep_no}"} if show_plot and ep.plot: info["plot"] = ep.plot if getattr(ep, "runtime_minutes", 0): info["duration"] = str(int(getattr(ep, "runtime_minutes", 0)) * 60) art: dict[str, str] = {} if show_art and ep.thumb: art = {"thumb": ep.thumb} mapped[ep_no] = (info, art) _tmdb_cache_set(_TMDB_SEASON_CACHE, season_key, mapped) cached_season = mapped return cached_season.get(episode_number, ({"title": episode_label}, {})) def _tmdb_episode_cast(*, title: str, season_label: str, episode_label: str) -> list[TmdbCastMember]: if not _tmdb_enabled(): return [] show_episode_cast = _get_setting_bool("tmdb_show_episode_cast", default=False) if not show_episode_cast: return [] title_key = (title or "").strip().casefold() tmdb_id = _tmdb_cache_get(_TMDB_ID_CACHE, title_key) if not tmdb_id: _tmdb_labels_and_art(title) tmdb_id = _tmdb_cache_get(_TMDB_ID_CACHE, title_key) if not tmdb_id: return [] season_number = _extract_first_int(season_label) episode_number = _extract_first_int(episode_label) if season_number is None or episode_number is None: return [] language = _get_setting_string("tmdb_language").strip() or "de-DE" cache_key = (tmdb_id, season_number, episode_number, language) cached = _tmdb_cache_get(_TMDB_EPISODE_CAST_CACHE, cache_key) if cached is not None: return list(cached) api_key = _get_setting_string("tmdb_api_key").strip() if not api_key: _tmdb_cache_set(_TMDB_EPISODE_CAST_CACHE, cache_key, []) return [] log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) log_fn = _tmdb_file_log if (log_requests or log_responses) else None try: cast = fetch_tv_episode_credits( tmdb_id=tmdb_id, season_number=season_number, episode_number=episode_number, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, ) except Exception as exc: if log_fn: log_fn( f"TMDB ERROR episode_credits_failed tmdb_id={tmdb_id} season={season_number} episode={episode_number} error={exc!r}" ) cast = [] _tmdb_cache_set(_TMDB_EPISODE_CAST_CACHE, cache_key, list(cast)) return list(cast) def _add_directory_item( handle: int, label: str, action: str, params: dict[str, str] | None = None, *, is_folder: bool = True, info_labels: dict[str, str] | None = None, art: dict[str, str] | None = None, cast: list[TmdbCastMember] | None = None, ) -> None: """Fuegt einen Eintrag (Folder oder Playable) in die Kodi-Liste ein.""" query: dict[str, str] = {"action": action} if params: query.update(params) url = f"{sys.argv[0]}?{urlencode(query)}" item = xbmcgui.ListItem(label=label) if not is_folder: try: item.setProperty("IsPlayable", "true") except Exception: pass _apply_video_info(item, info_labels, cast) if art: setter = getattr(item, "setArt", None) if callable(setter): try: setter(art) except Exception: pass xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder) def _plugin_version(plugin: BasisPlugin) -> str: raw = getattr(plugin, "version", "0.0.0") text = str(raw or "").strip() return text or "0.0.0" def _normalize_update_info_url(raw: str) -> str: value = str(raw or "").strip() default = "http://127.0.0.1:8080/repo/addons.xml" if not value: return default if value.endswith("/addons.xml"): return value return value.rstrip("/") + "/addons.xml" UPDATE_CHANNEL_MAIN = 0 UPDATE_CHANNEL_NIGHTLY = 1 UPDATE_CHANNEL_CUSTOM = 2 AUTO_UPDATE_INTERVAL_SEC = 6 * 60 * 60 UPDATE_HTTP_TIMEOUT_SEC = 8 UPDATE_ADDON_ID = "plugin.video.viewit" def _selected_update_channel() -> int: channel = _get_setting_int("update_channel", default=UPDATE_CHANNEL_MAIN) if channel not in {UPDATE_CHANNEL_MAIN, UPDATE_CHANNEL_NIGHTLY, UPDATE_CHANNEL_CUSTOM}: return UPDATE_CHANNEL_MAIN return channel def _channel_label(channel: int) -> str: if channel == UPDATE_CHANNEL_NIGHTLY: return "Nightly" if channel == UPDATE_CHANNEL_CUSTOM: return "Custom" return "Main" def _version_sort_key(version: str) -> tuple[int, ...]: base = str(version or "").split("-", 1)[0] parts = [] for chunk in base.split("."): try: parts.append(int(chunk)) except Exception: parts.append(0) while len(parts) < 4: parts.append(0) return tuple(parts[:4]) def _resolve_update_info_url() -> str: channel = _selected_update_channel() if channel == UPDATE_CHANNEL_NIGHTLY: raw = _get_setting_string("update_repo_url_nightly") elif channel == UPDATE_CHANNEL_CUSTOM: raw = _get_setting_string("update_repo_url") else: raw = _get_setting_string("update_repo_url_main") info_url = _normalize_update_info_url(raw) # Legacy-Setting beibehalten, damit bestehende Installationen und alte Builds weiterlaufen. _set_setting_string("update_repo_url", info_url) return info_url def _read_text_url(url: str, *, timeout: int = UPDATE_HTTP_TIMEOUT_SEC) -> str: request = Request(url, headers={"User-Agent": "ViewIT/1.0"}) response = None try: response = urlopen(request, timeout=timeout) data = response.read() finally: if response is not None: try: response.close() except Exception: pass return data.decode("utf-8", errors="replace") def _extract_repo_addon_version(xml_text: str, addon_id: str = UPDATE_ADDON_ID) -> str: try: root = ET.fromstring(xml_text) except Exception: return "-" if root.tag == "addon": return str(root.attrib.get("version") or "-") for node in root.findall("addon"): if str(node.attrib.get("id") or "").strip() == addon_id: version = str(node.attrib.get("version") or "").strip() return version or "-" return "-" def _fetch_repo_addon_version(info_url: str) -> str: url = _normalize_update_info_url(info_url) try: xml_text = _read_text_url(url) except URLError: return "-" except Exception: return "-" return _extract_repo_addon_version(xml_text) def _read_binary_url(url: str, *, timeout: int = UPDATE_HTTP_TIMEOUT_SEC) -> bytes: request = Request(url, headers={"User-Agent": "ViewIT/1.0"}) response = None try: response = urlopen(request, timeout=timeout) return response.read() finally: if response is not None: try: response.close() except Exception: pass def _extract_repo_identity(info_url: str) -> tuple[str, str, str, str] | None: parsed = urlparse(str(info_url or "").strip()) parts = [part for part in parsed.path.split("/") if part] try: raw_idx = parts.index("raw") except ValueError: return None # expected: /{owner}/{repo}/raw/branch/{branch}/addons.xml if raw_idx < 2 or (raw_idx + 2) >= len(parts): return None if parts[raw_idx + 1] != "branch": return None owner = parts[raw_idx - 2] repo = parts[raw_idx - 1] branch = parts[raw_idx + 2] scheme = parsed.scheme or "https" host = parsed.netloc if not owner or not repo or not branch or not host: return None return scheme, host, owner, repo + "|" + branch def _fetch_repo_versions(info_url: str) -> list[str]: identity = _extract_repo_identity(info_url) if identity is None: one = _fetch_repo_addon_version(info_url) return [one] if one != "-" else [] scheme, host, owner, repo_branch = identity repo, branch = repo_branch.split("|", 1) api_url = f"{scheme}://{host}/api/v1/repos/{owner}/{repo}/contents/{UPDATE_ADDON_ID}?ref={branch}" try: payload = _read_text_url(api_url) data = json.loads(payload) except Exception: one = _fetch_repo_addon_version(info_url) return [one] if one != "-" else [] versions: list[str] = [] if isinstance(data, list): for entry in data: if not isinstance(entry, dict): continue name = str(entry.get("name") or "") match = re.match(rf"^{re.escape(UPDATE_ADDON_ID)}-(.+)\.zip$", name) if not match: continue version = match.group(1).strip() if version: versions.append(version) unique = sorted(set(versions), key=_version_sort_key, reverse=True) return unique def _extract_changelog_section(changelog_text: str, version: str) -> str: lines = changelog_text.splitlines() wanted = (version or "").strip() if not wanted: return "\n".join(lines[:120]).strip() start = -1 for idx, line in enumerate(lines): if line.startswith("## ") and wanted in line: start = idx break if start < 0: return f"Kein Changelog-Abschnitt fuer Version {wanted} gefunden." end = len(lines) for idx in range(start + 1, len(lines)): if lines[idx].startswith("## "): end = idx break return "\n".join(lines[start:end]).strip() def _fetch_changelog_for_channel(channel: int, version: str) -> str: if channel == UPDATE_CHANNEL_MAIN: url = "https://gitea.it-drui.de/viewit/ViewIT/raw/branch/main/CHANGELOG.md" else: url = "https://gitea.it-drui.de/viewit/ViewIT/raw/branch/nightly/CHANGELOG-NIGHTLY.md" try: text = _read_text_url(url) except Exception: return "Changelog konnte nicht geladen werden." return _extract_changelog_section(text, version) def _install_addon_version(info_url: str, version: str) -> bool: base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/") zip_url = f"{base}/{UPDATE_ADDON_ID}/{UPDATE_ADDON_ID}-{version}.zip" try: zip_bytes = _read_binary_url(zip_url) except Exception as exc: _log(f"Download fehlgeschlagen ({zip_url}): {exc}", xbmc.LOGWARNING) return False if xbmcvfs is None: return False addons_root = xbmcvfs.translatePath("special://home/addons") addons_root_real = os.path.realpath(addons_root) try: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as archive: for member in archive.infolist(): name = str(member.filename or "") if not name or name.endswith("/"): continue target = os.path.realpath(os.path.join(addons_root, name)) if not target.startswith(addons_root_real + os.sep): continue os.makedirs(os.path.dirname(target), exist_ok=True) with archive.open(member, "r") as src, open(target, "wb") as dst: dst.write(src.read()) except Exception as exc: _log(f"Entpacken fehlgeschlagen: {exc}", xbmc.LOGWARNING) return False builtin = getattr(xbmc, "executebuiltin", None) if callable(builtin): builtin("UpdateLocalAddons") return True def _sync_update_channel_status_settings() -> None: channel = _selected_update_channel() channel_label = _channel_label(channel) selected_info_url = _resolve_update_info_url() main_info_url = _normalize_update_info_url(_get_setting_string("update_repo_url_main")) nightly_info_url = _normalize_update_info_url(_get_setting_string("update_repo_url_nightly")) available_main = _fetch_repo_addon_version(main_info_url) available_nightly = _fetch_repo_addon_version(nightly_info_url) if channel == UPDATE_CHANNEL_MAIN: available_selected = available_main elif channel == UPDATE_CHANNEL_NIGHTLY: available_selected = available_nightly else: available_selected = _fetch_repo_addon_version(selected_info_url) _set_setting_string("update_active_channel", channel_label) _set_setting_string("update_active_repo_url", selected_info_url) _set_setting_string("update_available_main", available_main) _set_setting_string("update_available_nightly", available_nightly) _set_setting_string("update_available_selected", available_selected) def _repo_addon_xml_path() -> str: if xbmcvfs is None: return "" try: return xbmcvfs.translatePath("special://home/addons/repository.viewit/addon.xml") except Exception: return "" def _update_repository_source(info_url: str) -> bool: path = _repo_addon_xml_path() if not path: return False if not os.path.exists(path): return False try: tree = ET.parse(path) root = tree.getroot() dir_node = root.find(".//dir") if dir_node is None: return False info = dir_node.find("info") checksum = dir_node.find("checksum") datadir = dir_node.find("datadir") if info is None or checksum is None or datadir is None: return False base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/") info.text = info_url checksum.text = f"{base}/addons.xml.md5" datadir.text = f"{base}/" tree.write(path, encoding="utf-8", xml_declaration=True) return True except Exception as exc: _log(f"Repository-URL konnte nicht gesetzt werden: {exc}", xbmc.LOGWARNING) return False def _settings_key_for_plugin(name: str) -> str: safe = re.sub(r"[^a-z0-9]+", "_", (name or "").strip().casefold()).strip("_") return f"update_version_{safe}" if safe else "update_version_unknown" def _sync_update_version_settings() -> None: addon = _get_addon() addon_version = "0.0.0" if addon is not None: try: addon_version = str(addon.getAddonInfo("version") or "0.0.0") except Exception: addon_version = "0.0.0" _set_setting_string("update_version_addon", addon_version) _set_setting_string("update_installed_version", addon_version) versions = { "update_version_serienstream": "-", "update_version_aniworld": "-", "update_version_einschalten": "-", "update_version_topstreamfilm": "-", "update_version_filmpalast": "-", "update_version_doku_streams": "-", } for plugin in _discover_plugins().values(): key = _settings_key_for_plugin(str(plugin.name)) if key in versions: versions[key] = _plugin_version(plugin) for key, value in versions.items(): _set_setting_string(key, value) _sync_update_channel_status_settings() def _show_root_menu() -> None: handle = _get_handle() _log("Root-Menue wird angezeigt.") _add_directory_item(handle, "Suche in allen Quellen", "search") plugins = _discover_plugins() for plugin_name in sorted(plugins.keys(), key=lambda value: value.casefold()): _add_directory_item(handle, plugin_name, "plugin_menu", {"plugin": plugin_name}, is_folder=True) _add_directory_item(handle, "Einstellungen", "settings") xbmcplugin.endOfDirectory(handle) def _show_plugin_menu(plugin_name: str) -> None: handle = _get_handle() plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification("Quelle", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, plugin_name) _add_directory_item(handle, "Suche", "plugin_search", {"plugin": plugin_name}, is_folder=True) if _plugin_has_capability(plugin, "new_titles") or _plugin_has_capability(plugin, "latest_episodes"): _add_directory_item(handle, LATEST_MENU_LABEL, "latest_titles", {"plugin": plugin_name, "page": "1"}, is_folder=True) if _plugin_has_capability(plugin, "genres"): _add_directory_item(handle, "Genres", "genres", {"plugin": plugin_name}, is_folder=True) if _plugin_has_capability(plugin, "alpha"): _add_directory_item(handle, "A-Z", "alpha_index", {"plugin": plugin_name}, is_folder=True) if _plugin_has_capability(plugin, "series_catalog"): _add_directory_item(handle, "Serien", "series_catalog", {"plugin": plugin_name, "page": "1"}, is_folder=True) if _plugin_has_capability(plugin, "popular_series"): _add_directory_item(handle, POPULAR_MENU_LABEL, "popular", {"plugin": plugin_name, "page": "1"}, is_folder=True) xbmcplugin.endOfDirectory(handle) def _show_plugin_search(plugin_name: str) -> None: plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification("Suche", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) _show_root_menu() return _log(f"Plugin-Suche gestartet: {plugin_name}") dialog = xbmcgui.Dialog() query = dialog.input(f"{plugin_name}: Titel eingeben", type=xbmcgui.INPUT_ALPHANUM).strip() if not query: _log("Plugin-Suche abgebrochen (leere Eingabe).", xbmc.LOGDEBUG) _show_plugin_menu(plugin_name) return _log(f"Plugin-Suchbegriff ({plugin_name}): {query}", xbmc.LOGDEBUG) _show_plugin_search_results(plugin_name, query) def _show_plugin_search_results(plugin_name: str, query: str) -> None: handle = _get_handle() plugin_name = (plugin_name or "").strip() query = (query or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification("Suche", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, f"{plugin_name}: {query}") _set_content(handle, "movies" if plugin_name.casefold() == "einschalten" else "tvshows") _log(f"Suche nach Titeln (Plugin={plugin_name}): {query}") list_items: list[dict[str, object]] = [] canceled = False try: with _progress_dialog("Suche laeuft", f"{plugin_name} (1/1) startet...") as progress: canceled = progress(5, f"{plugin_name} (1/1) Suche...") plugin_progress = lambda msg="", pct=None: progress( # noqa: E731 - kompakte Callback-Bruecke max(5, min(95, int(pct))) if pct is not None else 20, f"{plugin_name} (1/1) {str(msg or 'Suche...').strip()}", ) search_coro = _call_plugin_search(plugin, query, progress_callback=plugin_progress) try: results = _run_async(search_coro) except Exception: if inspect.iscoroutine(search_coro): try: search_coro.close() except Exception: pass raise results = _clean_search_titles([str(t).strip() for t in (results or []) if t and str(t).strip()]) results.sort(key=lambda value: value.casefold()) use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, results) if use_source else {} tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_titles = list(results) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in results: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles and not canceled: canceled = progress(35, f"{plugin_name} (1/1) Metadaten...") tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles)) total_results = max(1, len(results)) for index, title in enumerate(results, start=1): if canceled: break if index == 1 or index == total_results or (index % 10 == 0): pct = 35 + int((index / float(total_results)) * 60) canceled = progress(pct, f"{plugin_name} (1/1) aufbereiten {index}/{total_results}") tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) merged_info = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool(plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False)) extra_params = _series_url_params(plugin, title) list_items.append( { "label": display_label, "action": "play_movie" if direct_play else "seasons", "params": {"plugin": plugin_name, "title": title, **extra_params}, "is_folder": (not direct_play), "info_labels": merged_info, "art": art, "cast": cast, } ) except Exception as exc: _log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Suche", "Suche fehlgeschlagen.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return if canceled and not list_items: xbmcgui.Dialog().notification("Suche", "Suche abgebrochen.", xbmcgui.NOTIFICATION_INFO, 2500) xbmcplugin.endOfDirectory(handle) return for item in list_items: _add_directory_item( handle, str(item["label"]), str(item["action"]), dict(item["params"]), is_folder=bool(item["is_folder"]), info_labels=item["info_labels"], art=item["art"], cast=item["cast"], ) xbmcplugin.endOfDirectory(handle) def _import_plugin_module(path: Path) -> ModuleType: spec = importlib.util.spec_from_file_location(path.stem, path) if spec is None or spec.loader is None: raise ImportError(f"Modul-Spezifikation fuer {path.name} fehlt.") module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module try: spec.loader.exec_module(module) except Exception: sys.modules.pop(spec.name, None) raise return module def _discover_plugins() -> dict[str, BasisPlugin]: """Laedt alle Plugins aus `plugins/*.py` und cached Instanzen im RAM.""" global _PLUGIN_CACHE if _PLUGIN_CACHE is not None: return _PLUGIN_CACHE # Plugins werden dynamisch aus `plugins/*.py` geladen, damit Integrationen getrennt # entwickelt und bei Fehlern isoliert deaktiviert werden koennen. plugins: dict[str, BasisPlugin] = {} if not PLUGIN_DIR.exists(): _PLUGIN_CACHE = plugins return plugins for file_path in sorted(PLUGIN_DIR.glob("*.py")): if file_path.name.startswith("_"): continue try: module = _import_plugin_module(file_path) except Exception as exc: xbmc.log(f"Plugin-Datei {file_path.name} konnte nicht geladen werden: {exc}", xbmc.LOGWARNING) continue preferred = getattr(module, "Plugin", None) if inspect.isclass(preferred) and issubclass(preferred, BasisPlugin) and preferred is not BasisPlugin: plugin_classes = [preferred] else: plugin_classes = [ obj for obj in module.__dict__.values() if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin ] plugin_classes.sort(key=lambda cls: cls.__name__.casefold()) for cls in plugin_classes: try: instance = cls() except Exception as exc: xbmc.log(f"Plugin {cls.__name__} konnte nicht geladen werden: {exc}", xbmc.LOGWARNING) continue if getattr(instance, "is_available", True) is False: reason = getattr(instance, "unavailable_reason", "Nicht verfuegbar.") xbmc.log(f"Plugin {cls.__name__} deaktiviert: {reason}", xbmc.LOGWARNING) continue plugin_name = str(getattr(instance, "name", "") or "").strip() if not plugin_name: xbmc.log( f"Plugin {cls.__name__} wurde ohne Name registriert und wird uebersprungen.", xbmc.LOGWARNING, ) continue if plugin_name in plugins: xbmc.log( f"Plugin-Name doppelt ({plugin_name}), {cls.__name__} wird uebersprungen.", xbmc.LOGWARNING, ) continue plugins[plugin_name] = instance plugins = dict(sorted(plugins.items(), key=lambda item: item[0].casefold())) _PLUGIN_CACHE = plugins return plugins def _run_async(coro): """Fuehrt eine Coroutine aus, auch wenn Kodi bereits einen Event-Loop hat.""" _ensure_windows_selector_policy() def _run_with_asyncio_run(): return asyncio.run(coro) try: running_loop = asyncio.get_running_loop() except RuntimeError: running_loop = None if running_loop and running_loop.is_running(): result_box: dict[str, object] = {} error_box: dict[str, BaseException] = {} def _worker() -> None: try: result_box["value"] = _run_with_asyncio_run() except BaseException as exc: # pragma: no cover - defensive error_box["error"] = exc worker = threading.Thread(target=_worker, name="viewit-async-runner") worker.start() worker.join() if "error" in error_box: raise error_box["error"] return result_box.get("value") return _run_with_asyncio_run() def _series_url_params(plugin: BasisPlugin, title: str) -> dict[str, str]: getter = getattr(plugin, "series_url_for_title", None) if not callable(getter): return {} try: series_url = str(getter(title) or "").strip() except Exception: return {} return {"series_url": series_url} if series_url else {} def _clean_search_titles(values: list[str]) -> list[str]: """Filtert offensichtliche Platzhalter und dedupliziert Treffer.""" blocked = { "stream", "streams", "film", "movie", "play", "details", "details/play", } cleaned: list[str] = [] seen: set[str] = set() for raw in values: title = (raw or "").strip() if not title: continue key = title.casefold() if key in blocked: continue if key in seen: continue seen.add(key) cleaned.append(title) return cleaned def _show_search() -> None: _log("Suche gestartet.") dialog = xbmcgui.Dialog() query = dialog.input("Titel eingeben", type=xbmcgui.INPUT_ALPHANUM).strip() if not query: _log("Suche abgebrochen (leere Eingabe).", xbmc.LOGDEBUG) _show_root_menu() return _log(f"Suchbegriff: {query}", xbmc.LOGDEBUG) _show_search_results(query) def _show_search_results(query: str) -> None: handle = _get_handle() _log(f"Suche nach Titeln: {query}") _set_content(handle, "tvshows") plugins = _discover_plugins() if not plugins: xbmcgui.Dialog().notification("Suche", "Keine Quellen gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return list_items: list[dict[str, object]] = [] canceled = False plugin_entries = list(plugins.items()) total_plugins = max(1, len(plugin_entries)) with _progress_dialog("Suche laeuft", "Suche startet...") as progress: for plugin_index, (plugin_name, plugin) in enumerate(plugin_entries, start=1): range_start = int(((plugin_index - 1) / float(total_plugins)) * 100) range_end = int((plugin_index / float(total_plugins)) * 100) canceled = progress(range_start, f"{plugin_name} ({plugin_index}/{total_plugins}) Suche...") if canceled: break plugin_progress = lambda msg="", pct=None: progress( # noqa: E731 - kompakte Callback-Bruecke max(range_start, min(range_end, int(pct))) if pct is not None else range_start + 20, f"{plugin_name} ({plugin_index}/{total_plugins}) {str(msg or 'Suche...').strip()}", ) search_coro = _call_plugin_search(plugin, query, progress_callback=plugin_progress) try: results = _run_async(search_coro) except Exception as exc: if inspect.iscoroutine(search_coro): try: search_coro.close() except Exception: pass _log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) continue results = _clean_search_titles([str(t).strip() for t in (results or []) if t and str(t).strip()]) _log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG) use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, results) if use_source else {} tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_titles = list(results) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in results: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: canceled = progress( range_start + int((range_end - range_start) * 0.35), f"{plugin_name} ({plugin_index}/{total_plugins}) Metadaten...", ) if canceled: break tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles)) total_results = max(1, len(results)) for title_index, title in enumerate(results, start=1): if title_index == 1 or title_index == total_results or (title_index % 10 == 0): canceled = progress( range_start + int((range_end - range_start) * (0.35 + 0.65 * (title_index / float(total_results)))), f"{plugin_name} ({plugin_index}/{total_plugins}) aufbereiten {title_index}/{total_results}", ) if canceled: break tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) merged_info = _apply_playstate_to_info(dict(info_labels), playstate) label = _label_with_duration(title, info_labels) label = _label_with_playstate(label, playstate) label = f"{label} [{plugin_name}]" direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) extra_params = _series_url_params(plugin, title) list_items.append( { "label": label, "action": "play_movie" if direct_play else "seasons", "params": {"plugin": plugin_name, "title": title, **extra_params}, "is_folder": (not direct_play), "info_labels": merged_info, "art": art, "cast": cast, } ) if canceled: break if not canceled: progress(100, "Suche fertig") if canceled and not list_items: xbmcgui.Dialog().notification("Suche", "Suche abgebrochen.", xbmcgui.NOTIFICATION_INFO, 2500) xbmcplugin.endOfDirectory(handle) return for item in list_items: _add_directory_item( handle, str(item["label"]), str(item["action"]), dict(item["params"]), is_folder=bool(item["is_folder"]), info_labels=item["info_labels"], art=item["art"], cast=item["cast"], ) xbmcplugin.endOfDirectory(handle) def _movie_seed_for_title(plugin: BasisPlugin, title: str, seasons: list[str]) -> tuple[str, str] | None: """Ermittelt ein Film-Seed (Season/Episode), um direkt Provider anzeigen zu können.""" if not seasons or len(seasons) != 1: return None season = str(seasons[0] or "").strip() if not season: return None try: episodes = [str(value or "").strip() for value in (plugin.episodes_for(title, season) or [])] except Exception: return None episodes = [value for value in episodes if value] if len(episodes) != 1: return None episode = episodes[0] season_key = season.casefold() episode_key = episode.casefold() title_key = (title or "").strip().casefold() generic_seasons = {"film", "movie", "stream"} generic_episodes = {"stream", "film", "play", title_key} if season_key in generic_seasons and episode_key in generic_episodes: return (season, episode) return None def _show_movie_streams( plugin_name: str, title: str, season: str, episode: str, *, series_url: str = "", ) -> None: handle = _get_handle() plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Streams", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return if series_url: remember_series_url = getattr(plugin, "remember_series_url", None) if callable(remember_series_url): try: remember_series_url(title, series_url) except Exception: pass xbmcplugin.setPluginCategory(handle, f"{title} - Streams") _set_content(handle, "videos") base_params = {"plugin": plugin_name, "title": title, "season": season, "episode": episode} if series_url: base_params["series_url"] = series_url # Hoster bleiben im Auswahldialog der Wiedergabe (wie bisher). _add_directory_item( handle, title, "play_episode", dict(base_params), is_folder=False, info_labels={"title": title, "mediatype": "movie"}, ) xbmcplugin.endOfDirectory(handle) def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None: handle = _get_handle() _log(f"Staffeln laden: {plugin_name} / {title}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Staffeln", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return if series_url: remember_series_url = getattr(plugin, "remember_series_url", None) if callable(remember_series_url): try: remember_series_url(title, series_url) except Exception: pass use_source, show_tmdb, _prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_enabled() ) title_info_labels: dict[str, str] | None = None title_art: dict[str, str] | None = None title_cast: list[TmdbCastMember] | None = None meta_getter = getattr(plugin, "metadata_for", None) if use_source and callable(meta_getter): try: with _busy_dialog("Metadaten werden geladen..."): meta_labels, meta_art, meta_cast = meta_getter(title) if isinstance(meta_labels, dict): title_info_labels = {str(k): str(v) for k, v in meta_labels.items() if v} if isinstance(meta_art, dict): title_art = {str(k): str(v) for k, v in meta_art.items() if v} if isinstance(meta_cast, list): # type: ignore[assignment] - plugins may return cast in their own shape; best-effort only title_cast = meta_cast # noqa: PGH003 except Exception: pass try: seasons = plugin.seasons_for(title) except Exception as exc: _log(f"Staffeln laden fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Staffeln", "Staffeln konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return movie_seed = _movie_seed_for_title(plugin, title, seasons) if movie_seed is not None: # Dieser Action-Pfad wurde als Verzeichnis aufgerufen. Ohne endOfDirectory() # bleibt Kodi im Busy-Zustand, auch wenn wir direkt in die Wiedergabe springen. try: xbmcplugin.endOfDirectory(handle, succeeded=False) except Exception: try: xbmcplugin.endOfDirectory(handle) except Exception: pass _play_episode( plugin_name, title, movie_seed[0], movie_seed[1], series_url=series_url, ) return count = len(seasons) suffix = "Staffel" if count == 1 else "Staffeln" xbmcplugin.setPluginCategory(handle, f"{title} ({count} {suffix})") _set_content(handle, "seasons") # Staffel-Metadaten (Plot/Poster) optional via TMDB. if show_tmdb: _tmdb_labels_and_art(title) api_key = _get_setting_string("tmdb_api_key").strip() if show_tmdb else "" language = _get_setting_string("tmdb_language").strip() or "de-DE" show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) flags = f"p{int(show_plot)}a{int(show_art)}" log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) log_fn = _tmdb_file_log if (log_requests or log_responses) else None for season in seasons: info_labels: dict[str, str] | None = None art: dict[str, str] | None = None season_number = _extract_first_int(season) if api_key and season_number is not None: cache_key = (_tmdb_cache_get(_TMDB_ID_CACHE, (title or "").strip().casefold(), 0), season_number, language, flags) cached = _tmdb_cache_get(_TMDB_SEASON_SUMMARY_CACHE, cache_key) if cached is None and cache_key[0]: try: meta = lookup_tv_season_summary( tmdb_id=cache_key[0], season_number=season_number, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, ) except Exception as exc: if log_fn: log_fn(f"TMDB ERROR season_summary_failed tmdb_id={cache_key[0]} season={season_number} error={exc!r}") meta = None labels = {"title": season} art_map: dict[str, str] = {} if meta: if show_plot and meta.plot: labels["plot"] = meta.plot if show_art and meta.poster: art_map = {"thumb": meta.poster, "poster": meta.poster} cached = (labels, art_map) _tmdb_cache_set(_TMDB_SEASON_SUMMARY_CACHE, cache_key, cached) if cached is not None: info_labels, art = cached merged_labels = dict(info_labels or {}) if title_info_labels: merged_labels = dict(title_info_labels) merged_labels.update(dict(info_labels or {})) season_state = _season_playstate(plugin_name, title, season) merged_labels = _apply_playstate_to_info(dict(merged_labels), season_state) merged_art: dict[str, str] | None = art if title_art: merged_art = dict(title_art) if isinstance(art, dict): merged_art.update({k: str(v) for k, v in art.items() if v}) _add_directory_item( handle, _label_with_playstate(season, season_state), "episodes", {"plugin": plugin_name, "title": title, "season": season, "series_url": series_url}, is_folder=True, info_labels=merged_labels or None, art=merged_art, cast=title_cast, ) xbmcplugin.endOfDirectory(handle) def _show_episodes(plugin_name: str, title: str, season: str, series_url: str = "") -> None: handle = _get_handle() _log(f"Episoden laden: {plugin_name} / {title} / {season}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Episoden", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return if series_url: remember_series_url = getattr(plugin, "remember_series_url", None) if callable(remember_series_url): try: remember_series_url(title, series_url) except Exception: pass season_number = _extract_first_int(season) if season_number is not None: xbmcplugin.setPluginCategory(handle, f"{title} - Staffel {season_number}") else: xbmcplugin.setPluginCategory(handle, f"{title} - {season}") _set_content(handle, "episodes") episodes = list(plugin.episodes_for(title, season)) if episodes: episode_url_getter = getattr(plugin, "episode_url_for", None) supports_direct_episode_url = callable(getattr(plugin, "stream_link_for_url", None)) use_source, show_tmdb, _prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_enabled() ) show_info: dict[str, str] = {} show_art: dict[str, str] = {} show_cast: list[TmdbCastMember] | None = None if show_tmdb: show_info, show_art, show_cast = _tmdb_labels_and_art(title) elif use_source: meta_getter = getattr(plugin, "metadata_for", None) if callable(meta_getter): try: with _busy_dialog("Episoden-Metadaten werden geladen..."): meta_labels, meta_art, meta_cast = meta_getter(title) if isinstance(meta_labels, dict): show_info = {str(k): str(v) for k, v in meta_labels.items() if v} if isinstance(meta_art, dict): show_art = {str(k): str(v) for k, v in meta_art.items() if v} if isinstance(meta_cast, list): show_cast = meta_cast # noqa: PGH003 except Exception: pass show_fanart = (show_art or {}).get("fanart") if isinstance(show_art, dict) else "" show_poster = (show_art or {}).get("poster") if isinstance(show_art, dict) else "" with _busy_dialog("Episoden werden aufbereitet..."): for episode in episodes: if show_tmdb: info_labels, art = _tmdb_episode_labels_and_art( title=title, season_label=season, episode_label=episode ) episode_cast = _tmdb_episode_cast(title=title, season_label=season, episode_label=episode) else: info_labels, art, episode_cast = {}, {}, [] merged_info = dict(show_info or {}) merged_info.update(dict(info_labels or {})) merged_art: dict[str, str] = {} if isinstance(show_art, dict): merged_art.update({k: str(v) for k, v in show_art.items() if v}) if isinstance(art, dict): merged_art.update({k: str(v) for k, v in art.items() if v}) # Kodi Info-Dialog für Episoden hängt oft an diesen Feldern. season_number = _extract_first_int(season) or 0 episode_number = _extract_first_int(episode) or 0 merged_info.setdefault("mediatype", "episode") merged_info.setdefault("tvshowtitle", title) if season_number: merged_info.setdefault("season", str(season_number)) if episode_number: merged_info.setdefault("episode", str(episode_number)) # Episode-Items ohne eigenes Artwork: Fanart/Poster vom Titel durchreichen. if show_fanart: merged_art.setdefault("fanart", show_fanart) merged_art.setdefault("landscape", show_fanart) if show_poster: merged_art.setdefault("poster", show_poster) key = _playstate_key(plugin_name=plugin_name, title=title, season=season, episode=episode) merged_info = _apply_playstate_to_info(merged_info, _get_playstate(key)) display_label = episode play_params = { "plugin": plugin_name, "title": title, "season": season, "episode": episode, "series_url": series_url, } if supports_direct_episode_url and callable(episode_url_getter): try: episode_url = str(episode_url_getter(title, season, episode) or "").strip() except Exception: episode_url = "" if episode_url: play_params["url"] = episode_url _add_directory_item( handle, display_label, "play_episode", play_params, is_folder=False, info_labels=merged_info, art=merged_art, cast=episode_cast or show_cast, ) xbmcplugin.endOfDirectory(handle) def _show_genre_sources() -> None: handle = _get_handle() _log("Genre-Quellen laden.") plugins = _discover_plugins() sources: list[tuple[str, BasisPlugin]] = [] for plugin_name, plugin in plugins.items(): if plugin.__class__.genres is BasisPlugin.genres: continue if plugin.__class__.titles_for_genre is BasisPlugin.titles_for_genre: continue sources.append((plugin_name, plugin)) if not sources: xbmcgui.Dialog().notification("Genres", "Keine Quellen mit Genres gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for plugin_name, plugin in sources: _add_directory_item( handle, f"Genres [{plugin_name}]", "genres", {"plugin": plugin_name}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_genres(plugin_name: str) -> None: handle = _get_handle() _log(f"Genres laden: {plugin_name}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Genres", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return try: genres = _run_with_progress( "Genres", f"{plugin_name}: Genres werden geladen...", lambda: plugin.genres(), ) except Exception as exc: _log(f"Genres konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Genres", "Genres konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for genre in genres: # Wenn Plugin Paging unterstützt, direkt paginierte Titelliste öffnen. paging_getter = getattr(plugin, "titles_for_genre_page", None) if callable(paging_getter): _add_directory_item( handle, genre, "genre_titles_page", {"plugin": plugin_name, "genre": genre, "page": "1"}, is_folder=True, ) continue _add_directory_item( handle, genre, "genre_series", {"plugin": plugin_name, "genre": genre}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_categories(plugin_name: str) -> None: handle = _get_handle() _log(f"Kategorien laden: {plugin_name}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Kategorien", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return getter = getattr(plugin, "categories", None) if not callable(getter): xbmcgui.Dialog().notification("Kategorien", "Kategorien nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return try: categories = _run_with_progress( "Kategorien", f"{plugin_name}: Kategorien werden geladen...", lambda: list(getter() or []), ) except Exception as exc: _log(f"Kategorien konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Kategorien", "Kategorien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for category in categories: category = str(category).strip() if not category: continue _add_directory_item( handle, category, "category_titles_page", {"plugin": plugin_name, "category": category, "page": "1"}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -> None: handle = _get_handle() plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Kategorien", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return page = max(1, int(page or 1)) paging_getter = getattr(plugin, "titles_for_genre_page", None) if not callable(paging_getter): xbmcgui.Dialog().notification("Kategorien", "Seitenwechsel nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return total_pages = None count_getter = getattr(plugin, "genre_page_count", None) if callable(count_getter): try: total_pages = int(count_getter(category) or 1) except Exception: total_pages = None if total_pages is not None: page = min(page, max(1, total_pages)) xbmcplugin.setPluginCategory(handle, f"{category} ({page}/{total_pages})") else: xbmcplugin.setPluginCategory(handle, f"{category} ({page})") _set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows") if page > 1: _add_directory_item( handle, "Vorherige Seite", "category_titles_page", {"plugin": plugin_name, "category": category, "page": str(page - 1)}, is_folder=True, ) try: titles = _run_with_progress( "Kategorien", f"{plugin_name}: {category} Seite {page} wird geladen...", lambda: list(paging_getter(category, page) or []), ) except Exception as exc: _log(f"Kategorie-Seite konnte nicht geladen werden ({plugin_name}/{category} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Kategorien", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) if titles: use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(titles) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in titles: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog("Genre-Liste wird geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in titles: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, display_label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=not direct_play, info_labels=info_labels, art=art, cast=cast, ) show_next = False if total_pages is not None: show_next = page < total_pages else: has_more_getter = getattr(plugin, "genre_has_more", None) if callable(has_more_getter): try: show_next = bool(has_more_getter(category, page)) except Exception: show_next = False if show_next: _add_directory_item( handle, "Naechste Seite", "category_titles_page", {"plugin": plugin_name, "category": category, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None: handle = _get_handle() plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Genres", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return page = max(1, int(page or 1)) paging_getter = getattr(plugin, "titles_for_genre_page", None) if not callable(paging_getter): xbmcgui.Dialog().notification("Genres", "Seitenwechsel nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return total_pages = None count_getter = getattr(plugin, "genre_page_count", None) if callable(count_getter): try: total_pages = int(count_getter(genre) or 1) except Exception: total_pages = None if total_pages is not None: page = min(page, max(1, total_pages)) xbmcplugin.setPluginCategory(handle, f"{genre} ({page}/{total_pages})") else: xbmcplugin.setPluginCategory(handle, f"{genre} ({page})") _set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows") if page > 1: _add_directory_item( handle, "Vorherige Seite", "genre_titles_page", {"plugin": plugin_name, "genre": genre, "page": str(page - 1)}, is_folder=True, ) try: titles = _run_with_progress( "Genres", f"{plugin_name}: {genre} Seite {page} wird geladen...", lambda: list(paging_getter(genre, page) or []), ) except Exception as exc: _log(f"Genre-Seite konnte nicht geladen werden ({plugin_name}/{genre} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Genres", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) if titles: use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(titles) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in titles: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog("Genre-Seite wird geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in titles: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, display_label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=not direct_play, info_labels=info_labels, art=art, cast=cast, ) show_next = False if total_pages is not None: show_next = page < total_pages else: has_more_getter = getattr(plugin, "genre_has_more", None) if callable(has_more_getter): try: show_next = bool(has_more_getter(genre, page)) except Exception: show_next = False if show_next: _add_directory_item( handle, "Naechste Seite", "genre_titles_page", {"plugin": plugin_name, "genre": genre, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_alpha_index(plugin_name: str) -> None: handle = _get_handle() _log(f"A-Z laden: {plugin_name}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("A-Z", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return getter = getattr(plugin, "alpha_index", None) if not callable(getter): xbmcgui.Dialog().notification("A-Z", "A-Z nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return try: letters = _run_with_progress( "A-Z", f"{plugin_name}: Index wird geladen...", lambda: list(getter() or []), ) except Exception as exc: _log(f"A-Z konnte nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("A-Z", "A-Z konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for letter in letters: letter = str(letter).strip() if not letter: continue _add_directory_item( handle, letter, "alpha_titles_page", {"plugin": plugin_name, "letter": letter, "page": "1"}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_alpha_titles_page(plugin_name: str, letter: str, page: int = 1) -> None: handle = _get_handle() plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("A-Z", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return page = max(1, int(page or 1)) paging_getter = getattr(plugin, "titles_for_alpha_page", None) if not callable(paging_getter): xbmcgui.Dialog().notification("A-Z", "Seitenwechsel nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return total_pages = None count_getter = getattr(plugin, "alpha_page_count", None) if callable(count_getter): try: total_pages = int(count_getter(letter) or 1) except Exception: total_pages = None if total_pages is not None: page = min(page, max(1, total_pages)) xbmcplugin.setPluginCategory(handle, f"{letter} ({page}/{total_pages})") else: xbmcplugin.setPluginCategory(handle, f"{letter} ({page})") _set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows") if page > 1: _add_directory_item( handle, "Vorherige Seite", "alpha_titles_page", {"plugin": plugin_name, "letter": letter, "page": str(page - 1)}, is_folder=True, ) try: titles = _run_with_progress( "A-Z", f"{plugin_name}: {letter} Seite {page} wird geladen...", lambda: list(paging_getter(letter, page) or []), ) except Exception as exc: _log(f"A-Z Seite konnte nicht geladen werden ({plugin_name}/{letter} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("A-Z", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) if titles: use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(titles) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in titles: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog("A-Z Liste wird geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in titles: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, display_label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=not direct_play, info_labels=info_labels, art=art, cast=cast, ) show_next = False if total_pages is not None: show_next = page < total_pages if show_next: _add_directory_item( handle, "Naechste Seite", "alpha_titles_page", {"plugin": plugin_name, "letter": letter, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_series_catalog(plugin_name: str, page: int = 1) -> None: handle = _get_handle() plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Serien", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return page = max(1, int(page or 1)) paging_getter = getattr(plugin, "series_catalog_page", None) if not callable(paging_getter): xbmcgui.Dialog().notification("Serien", "Serienkatalog nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return total_pages = None count_getter = getattr(plugin, "series_catalog_page_count", None) if callable(count_getter): try: total_pages = int(count_getter(page) or 1) except Exception: total_pages = None if total_pages is not None: page = min(page, max(1, total_pages)) xbmcplugin.setPluginCategory(handle, f"Serien ({page}/{total_pages})") else: xbmcplugin.setPluginCategory(handle, f"Serien ({page})") _set_content(handle, "tvshows") if page > 1: _add_directory_item( handle, "Vorherige Seite", "series_catalog", {"plugin": plugin_name, "page": str(page - 1)}, is_folder=True, ) try: titles = _run_with_progress( "Serien", f"{plugin_name}: Seite {page} wird geladen...", lambda: list(paging_getter(page) or []), ) except Exception as exc: _log(f"Serien konnten nicht geladen werden ({plugin_name} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Serien", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) if titles: use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(titles) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in titles: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog("A-Z Seite wird geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in titles: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) _add_directory_item( handle, display_label, "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=True, info_labels=info_labels, art=art, cast=cast, ) show_next = False if total_pages is not None: show_next = page < total_pages else: has_more_getter = getattr(plugin, "series_catalog_has_more", None) if callable(has_more_getter): try: show_next = bool(has_more_getter(page)) except Exception: show_next = False if show_next: _add_directory_item( handle, "Naechste Seite", "series_catalog", {"plugin": plugin_name, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _title_group_key(title: str) -> str: raw = (title or "").strip() if not raw: return "#" for char in raw: if char.isdigit(): return "0-9" if char.isalpha(): normalized = char.casefold() if normalized == "ä": normalized = "a" elif normalized == "ö": normalized = "o" elif normalized == "ü": normalized = "u" elif normalized == "ß": normalized = "s" return normalized.upper() return "#" def _genre_title_groups() -> list[tuple[str, str]]: return [ ("A-E", "A-E"), ("F-J", "F-J"), ("K-O", "K-O"), ("P-T", "P-T"), ("U-Z", "U-Z"), ("0-9", "0-9"), ] def _group_matches(group_code: str, title: str) -> bool: key = _title_group_key(title) if group_code == "0-9": return key == "0-9" if key == "0-9" or key == "#": return False if group_code == "A-E": return "A" <= key <= "E" if group_code == "F-J": return "F" <= key <= "J" if group_code == "K-O": return "K" <= key <= "O" if group_code == "P-T": return "P" <= key <= "T" if group_code == "U-Z": return "U" <= key <= "Z" return False def _get_genre_titles(plugin_name: str, genre: str) -> list[str]: cache_key = (plugin_name, genre) cached = _GENRE_TITLES_CACHE.get(cache_key) if cached is not None: return list(cached) plugin = _discover_plugins().get(plugin_name) if plugin is None: return [] titles = plugin.titles_for_genre(genre) titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) _GENRE_TITLES_CACHE[cache_key] = list(titles) return list(titles) def _show_genre_series(plugin_name: str, genre: str) -> None: handle = _get_handle() xbmcplugin.setPluginCategory(handle, genre) for label, group_code in _genre_title_groups(): _add_directory_item( handle, label, "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _parse_positive_int(value: str, *, default: int = 1) -> int: try: parsed = int(str(value or "").strip()) except Exception: return default return parsed if parsed > 0 else default def _popular_genre_label(plugin: BasisPlugin) -> str | None: label = getattr(plugin, "POPULAR_GENRE_LABEL", None) if isinstance(label, str) and label.strip(): return label.strip() return None def _plugin_has_capability(plugin: BasisPlugin, capability: str) -> bool: getter = getattr(plugin, "capabilities", None) if callable(getter): try: capabilities = getter() except Exception: capabilities = set() try: return capability in set(capabilities or []) except Exception: return False # Backwards compatibility: Popular via POPULAR_GENRE_LABEL constant. if capability == "popular_series": return _popular_genre_label(plugin) is not None return False def _plugins_with_popular() -> list[tuple[str, BasisPlugin, str]]: results: list[tuple[str, BasisPlugin, str]] = [] for plugin_name, plugin in _discover_plugins().items(): if not _plugin_has_capability(plugin, "popular_series"): continue label = _popular_genre_label(plugin) or "" results.append((plugin_name, plugin, label)) return results def _show_popular(plugin_name: str | None = None, page: int = 1) -> None: handle = _get_handle() page_size = LIST_PAGE_SIZE page = max(1, int(page or 1)) if plugin_name: plugin = _discover_plugins().get(plugin_name) if plugin is None or not _plugin_has_capability(plugin, "popular_series"): xbmcgui.Dialog().notification(POPULAR_MENU_LABEL, "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return try: popular_getter = getattr(plugin, "popular_series", None) if callable(popular_getter): titles = _run_with_progress( POPULAR_MENU_LABEL, f"{plugin_name}: Liste wird geladen...", lambda: list(popular_getter() or []), ) else: label = _popular_genre_label(plugin) if not label: titles = [] else: titles = _run_with_progress( POPULAR_MENU_LABEL, f"{plugin_name}: Liste wird geladen...", lambda: list(plugin.titles_for_genre(label) or []), ) except Exception as exc: _log(f"{POPULAR_MENU_LABEL} konnte nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification(POPULAR_MENU_LABEL, "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) total = len(titles) total_pages = max(1, (total + page_size - 1) // page_size) page = min(page, total_pages) xbmcplugin.setPluginCategory(handle, f"{POPULAR_MENU_LABEL} [{plugin_name}] ({page}/{total_pages})") _set_content(handle, "tvshows") if total_pages > 1 and page > 1: _add_directory_item( handle, "Vorherige Seite", "popular", {"plugin": plugin_name, "page": str(page - 1)}, is_folder=True, ) start = (page - 1) * page_size end = start + page_size page_items = titles[start:end] if page_items: use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(page_items) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in page_items: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog(f"{POPULAR_MENU_LABEL} wird geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in page_items: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) _add_directory_item( handle, display_label, "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=True, info_labels=info_labels, art=art, cast=cast, ) if total_pages > 1 and page < total_pages: _add_directory_item( handle, "Naechste Seite", "popular", {"plugin": plugin_name, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) return sources = _plugins_with_popular() if not sources: xbmcgui.Dialog().notification(POPULAR_MENU_LABEL, "Keine Quellen gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, POPULAR_MENU_LABEL) for name, plugin, _label in sources: _add_directory_item( handle, f"{POPULAR_MENU_LABEL} [{plugin.name}]", "popular", {"plugin": name, "page": "1"}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_new_titles(plugin_name: str, page: int = 1, *, action_name: str = "new_titles") -> None: handle = _get_handle() page_size = LIST_PAGE_SIZE page = max(1, int(page or 1)) plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if plugin is None or not _plugin_has_capability(plugin, "new_titles"): xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return getter = getattr(plugin, "new_titles", None) if not callable(getter): xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Diese Liste ist nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return paging_getter = getattr(plugin, "new_titles_page", None) has_more_getter = getattr(plugin, "new_titles_has_more", None) if callable(paging_getter): xbmcplugin.setPluginCategory(handle, f"{LATEST_MENU_LABEL} [{plugin_name}] ({page})") _set_content(handle, "movies" if plugin_name.casefold() == "einschalten" else "tvshows") if page > 1: _add_directory_item( handle, "Vorherige Seite", action_name, {"plugin": plugin_name, "page": str(page - 1)}, is_folder=True, ) try: page_items = _run_with_progress( LATEST_MENU_LABEL, f"{plugin_name}: Seite {page} wird geladen...", lambda: list(paging_getter(page) or []), ) except Exception as exc: _log(f"{LATEST_MENU_LABEL} konnten nicht geladen werden ({plugin_name} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Titel konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return page_items = [str(t).strip() for t in page_items if t and str(t).strip()] page_items.sort(key=lambda value: value.casefold()) else: try: titles = _run_with_progress( LATEST_MENU_LABEL, f"{plugin_name}: Liste wird geladen...", lambda: list(getter() or []), ) except Exception as exc: _log(f"{LATEST_MENU_LABEL} konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Titel konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) total = len(titles) if total == 0: xbmcgui.Dialog().notification( LATEST_MENU_LABEL, "Keine Titel gefunden. Bitte Basis-URL oder Index pruefen.", xbmcgui.NOTIFICATION_INFO, 4000, ) total_pages = max(1, (total + page_size - 1) // page_size) page = min(page, total_pages) xbmcplugin.setPluginCategory(handle, f"{LATEST_MENU_LABEL} [{plugin_name}] ({page}/{total_pages})") _set_content(handle, "movies" if plugin_name.casefold() == "einschalten" else "tvshows") if total_pages > 1 and page > 1: _add_directory_item( handle, "Vorherige Seite", action_name, {"plugin": plugin_name, "page": str(page - 1)}, is_folder=True, ) start = (page - 1) * page_size end = start + page_size page_items = titles[start:end] if page_items: use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(page_items) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in page_items: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog(f"{LATEST_MENU_LABEL} wird geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in page_items: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "movie") playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, display_label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=not direct_play, info_labels=info_labels, art=art, cast=cast, ) show_next = False if callable(paging_getter) and callable(has_more_getter): try: show_next = bool(has_more_getter(page)) except Exception: show_next = False elif "total_pages" in locals(): show_next = bool(total_pages > 1 and page < total_pages) # type: ignore[name-defined] if show_next: _add_directory_item( handle, "Naechste Seite", action_name, {"plugin": plugin_name, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_latest_episodes(plugin_name: str, page: int = 1) -> None: handle = _get_handle() plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return getter = getattr(plugin, "latest_episodes", None) if not callable(getter): xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Diese Quelle bietet das nicht an.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, f"{plugin_name}: {LATEST_MENU_LABEL}") _set_content(handle, "episodes") try: entries = _run_with_progress( LATEST_MENU_LABEL, f"{plugin_name}: Seite {page} wird geladen...", lambda: list(getter(page) or []), ) except Exception as exc: _log(f"{LATEST_MENU_LABEL} fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Abruf fehlgeschlagen.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for entry in entries: try: title = str(getattr(entry, "series_title", "") or "").strip() season_number = int(getattr(entry, "season", 0) or 0) episode_number = int(getattr(entry, "episode", 0) or 0) url = str(getattr(entry, "url", "") or "").strip() airdate = str(getattr(entry, "airdate", "") or "").strip() except Exception: continue if not title or not url or season_number < 0 or episode_number <= 0: continue season_label = f"Staffel {season_number}" episode_label = f"Episode {episode_number}" key = _playstate_key(plugin_name=plugin_name, title=title, season=season_label, episode=episode_label) playstate = _get_playstate(key) label = f"{title} - S{season_number:02d}E{episode_number:02d}" if airdate: label = f"{label} ({airdate})" label = _label_with_playstate(label, playstate) info_labels: dict[str, object] = { "title": f"{title} - S{season_number:02d}E{episode_number:02d}", "tvshowtitle": title, "season": season_number, "episode": episode_number, "mediatype": "episode", } info_labels = _apply_playstate_to_info(info_labels, playstate) _add_directory_item( handle, label, "play_episode_url", { "plugin": plugin_name, "title": title, "season": str(season_number), "episode": str(episode_number), "url": url, }, is_folder=False, info_labels=info_labels, ) xbmcplugin.endOfDirectory(handle) def _show_latest_titles(plugin_name: str, page: int = 1) -> None: plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if plugin is None: handle = _get_handle() xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return if _plugin_has_capability(plugin, "latest_episodes"): _show_latest_episodes(plugin_name, page) return if _plugin_has_capability(plugin, "new_titles"): _show_new_titles(plugin_name, page, action_name="latest_titles") return handle = _get_handle() xbmcgui.Dialog().notification(LATEST_MENU_LABEL, "Diese Quelle bietet das nicht an.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page: int = 1) -> None: handle = _get_handle() page_size = LIST_PAGE_SIZE page = max(1, int(page or 1)) plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Genres", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return grouped_paging = getattr(plugin, "titles_for_genre_group_page", None) grouped_has_more = getattr(plugin, "genre_group_has_more", None) if callable(grouped_paging): try: raw_items = _run_with_progress( "Genres", f"{plugin_name}: {genre} [{group_code}] Seite {page} wird geladen...", lambda: list(grouped_paging(genre, group_code, page, page_size) or []), ) page_items = [str(t).strip() for t in raw_items if t and str(t).strip()] except Exception as exc: _log(f"Genre-Serien konnten nicht geladen werden ({plugin_name}/{genre}/{group_code} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Genres", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, f"{genre} [{group_code}] ({page})") use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) if page > 1: _add_directory_item( handle, "Vorherige Seite", "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page - 1)}, is_folder=True, ) if page_items: plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(page_items) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in page_items: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog("Genre-Gruppe wird geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in page_items: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) _add_directory_item( handle, display_label, "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=True, info_labels=info_labels, art=art, cast=cast, ) show_next = False if callable(grouped_has_more): try: show_next = bool(grouped_has_more(genre, group_code, page, page_size)) except Exception: show_next = False elif len(page_items) >= page_size: show_next = True if show_next: _add_directory_item( handle, "Naechste Seite", "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) return try: titles = _get_genre_titles(plugin_name, genre) except Exception as exc: _log(f"Genre-Serien konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Genres", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return filtered = [title for title in titles if _group_matches(group_code, title)] total = len(filtered) total_pages = max(1, (total + page_size - 1) // page_size) page = min(page, total_pages) xbmcplugin.setPluginCategory(handle, f"{genre} [{group_code}] ({page}/{total_pages})") if total_pages > 1 and page > 1: _add_directory_item( handle, "Vorherige Seite", "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page - 1)}, is_folder=True, ) start = (page - 1) * page_size end = start + page_size page_items = filtered[start:end] use_source, show_tmdb, prefer_source = _metadata_policy( plugin_name, plugin, allow_tmdb=_tmdb_list_enabled() ) if page_items: plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {} show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_titles = list(page_items) if show_tmdb else [] if show_tmdb and prefer_source and use_source: tmdb_titles = [] for title in page_items: meta = plugin_meta.get(title) meta_labels = meta[0] if meta else {} meta_art = meta[1] if meta else {} if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art): tmdb_titles.append(title) if show_tmdb and tmdb_titles: with _busy_dialog("Genre-Serien werden geladen..."): tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles) for title in page_items: tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, []) meta = plugin_meta.get(title) info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) _add_directory_item( handle, display_label, "seasons", {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)}, is_folder=True, info_labels=info_labels, art=art, cast=cast, ) if total_pages > 1 and page < total_pages: _add_directory_item( handle, "Naechste Seite", "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _open_settings() -> None: """Oeffnet das Kodi-Addon-Settings-Dialog.""" if xbmcaddon is None: # pragma: no cover - outside Kodi raise RuntimeError("xbmcaddon ist nicht verfuegbar (KodiStub).") _sync_update_version_settings() addon = xbmcaddon.Addon() addon.openSettings() def _apply_update_channel(*, silent: bool = False) -> bool: if xbmc is None: # pragma: no cover - outside Kodi return False info_url = _resolve_update_info_url() _sync_update_version_settings() applied = _update_repository_source(info_url) installed_version = _get_setting_string("update_installed_version").strip() or "0.0.0" versions = _fetch_repo_versions(info_url) target_version = versions[0] if versions else "-" install_result = False if target_version != "-" and target_version != installed_version: install_result = _install_addon_version(info_url, target_version) elif target_version == installed_version: install_result = True builtin = getattr(xbmc, "executebuiltin", None) if callable(builtin): builtin("UpdateAddonRepos") builtin("UpdateLocalAddons") if not silent: if not applied: warning_icon = getattr(xbmcgui, "NOTIFICATION_WARNING", xbmcgui.NOTIFICATION_INFO) xbmcgui.Dialog().notification( "Updates", "Kanal gespeichert, aber repository.viewit nicht gefunden.", warning_icon, 5000, ) elif target_version == "-": xbmcgui.Dialog().notification( "Updates", "Kanal angewendet, aber keine Version im Kanal gefunden.", xbmcgui.NOTIFICATION_ERROR, 5000, ) elif not install_result: xbmcgui.Dialog().notification( "Updates", f"Kanal angewendet, Installation von {target_version} fehlgeschlagen.", xbmcgui.NOTIFICATION_ERROR, 5000, ) elif target_version == installed_version: xbmcgui.Dialog().notification( "Updates", f"Kanal angewendet: {_channel_label(_selected_update_channel())} ({target_version} bereits installiert)", xbmcgui.NOTIFICATION_INFO, 4500, ) else: xbmcgui.Dialog().notification( "Updates", f"Kanal angewendet: {_channel_label(_selected_update_channel())} -> {target_version} installiert", xbmcgui.NOTIFICATION_INFO, 5000, ) _sync_update_version_settings() return applied and install_result def _run_update_check(*, silent: bool = False) -> None: """Stoesst Kodi-Repo- und Addon-Updates an.""" if xbmc is None: # pragma: no cover - outside Kodi return try: _apply_update_channel(silent=True) if not silent: builtin = getattr(xbmc, "executebuiltin", None) if callable(builtin): builtin("ActivateWindow(addonbrowser,addons://updates/)") if not silent: xbmcgui.Dialog().notification("Updates", "Update-Check gestartet.", xbmcgui.NOTIFICATION_INFO, 4000) except Exception as exc: _log(f"Update-Pruefung fehlgeschlagen: {exc}", xbmc.LOGWARNING) if not silent: try: xbmcgui.Dialog().notification("Updates", "Update-Check fehlgeschlagen.", xbmcgui.NOTIFICATION_ERROR, 4000) except Exception: pass def _show_version_selector() -> None: if xbmc is None: # pragma: no cover - outside Kodi return info_url = _resolve_update_info_url() channel = _selected_update_channel() _sync_update_version_settings() versions = _fetch_repo_versions(info_url) if not versions: xbmcgui.Dialog().notification("Updates", "Keine Versionen im Repo gefunden.", xbmcgui.NOTIFICATION_ERROR, 4000) return installed = _get_setting_string("update_installed_version").strip() or "-" options = [] for version in versions: label = version if version == installed: label = f"{version} (installiert)" options.append(label) selected = xbmcgui.Dialog().select("Version waehlen", options) if selected < 0 or selected >= len(versions): return version = versions[selected] changelog = _fetch_changelog_for_channel(channel, version) viewer = getattr(xbmcgui.Dialog(), "textviewer", None) if callable(viewer): try: viewer(f"Changelog {version}", changelog) except Exception: pass dialog = xbmcgui.Dialog() try: confirm = dialog.yesno( "Version installieren", f"Version: {version}", "Installation jetzt starten?", yeslabel="Installieren", nolabel="Abbrechen", ) except TypeError: confirm = dialog.yesno("Version installieren", f"Version: {version}", "Installation jetzt starten?") if not confirm: return ok = _install_addon_version(info_url, version) if ok: _sync_update_version_settings() xbmcgui.Dialog().notification("Updates", f"Version {version} installiert.", xbmcgui.NOTIFICATION_INFO, 4000) else: xbmcgui.Dialog().notification("Updates", f"Installation von {version} fehlgeschlagen.", xbmcgui.NOTIFICATION_ERROR, 4500) def _maybe_run_auto_update_check(action: str | None) -> None: action = (action or "").strip() # Auto-Check nur beim Root-Menue, nicht in jedem Untermenue. if action: return if not _get_setting_bool("auto_update_enabled", default=False): return now = int(time.time()) last = _get_setting_int("auto_update_last_ts", default=0) if last > 0 and (now - last) < AUTO_UPDATE_INTERVAL_SEC: return _set_setting_string("auto_update_last_ts", str(now)) _run_update_check(silent=True) def _extract_first_int(value: str) -> int | None: match = re.search(r"(\d+)", value or "") if not match: return None try: return int(match.group(1)) except Exception: return None def _label_with_duration(label: str, info_labels: dict[str, str] | None) -> str: return label def _resolveurl_last_error() -> str: try: from resolveurl_backend import get_last_error # type: ignore except Exception: return "" try: return str(get_last_error() or "") except Exception: return "" def _is_cloudflare_challenge_error(message: str) -> bool: text = str(message or "").casefold() if not text: return False return "cloudflare" in text or "challenge" in text or "attention required" in text def _play_final_link( link: str, *, display_title: str | None = None, info_labels: dict[str, str] | None = None, art: dict[str, str] | None = None, cast: list[TmdbCastMember] | None = None, resolve_handle: int | None = None, ) -> None: list_item = xbmcgui.ListItem(label=display_title or "", path=link) try: list_item.setProperty("IsPlayable", "true") except Exception: pass merged_info: dict[str, object] = dict(info_labels or {}) if display_title: merged_info["title"] = display_title _apply_video_info(list_item, merged_info, cast) if art: setter = getattr(list_item, "setArt", None) if callable(setter): try: setter(art) except Exception: pass # Bei Plugin-Play-Items sollte Kodi via setResolvedUrl() die Wiedergabe starten. # player.play() kann dazu führen, dass Kodi den Item-Callback nochmal triggert (Hoster-Auswahl doppelt). resolved = False if resolve_handle is not None: resolver = getattr(xbmcplugin, "setResolvedUrl", None) if callable(resolver): try: resolver(resolve_handle, True, list_item) resolved = True except Exception: pass if not resolved: player = xbmc.Player() player.play(item=link, listitem=list_item) def _track_playback_and_update_state_async(key: str) -> None: # Eigenes Resume/Watched ist deaktiviert; Kodi verwaltet das selbst. return def _play_episode( plugin_name: str, title: str, season: str, episode: str, *, forced_hoster: str = "", episode_url: str = "", series_url: str = "", resolve_handle: int | None = None, ) -> None: episode_url = (episode_url or "").strip() if episode_url: _play_episode_url( plugin_name, title=title, season_number=_extract_first_int(season) or 0, episode_number=_extract_first_int(episode) or 0, episode_url=episode_url, season_label_override=season, episode_label_override=episode, resolve_handle=resolve_handle, ) return series_url = (series_url or "").strip() if series_url: plugin_for_url = _discover_plugins().get(plugin_name) remember_series_url = getattr(plugin_for_url, "remember_series_url", None) if plugin_for_url is not None else None if callable(remember_series_url): try: remember_series_url(title, series_url) except Exception: pass _log(f"Play anfordern: {plugin_name} / {title} / {season} / {episode}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Wiedergabe", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return available_hosters: list[str] = [] hoster_getter = getattr(plugin, "available_hosters_for", None) if callable(hoster_getter): try: with _busy_dialog("Hoster werden geladen..."): available_hosters = list(hoster_getter(title, season, episode) or []) except Exception as exc: _log(f"Hoster laden fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) selected_hoster: str | None = None forced_hoster = (forced_hoster or "").strip() if available_hosters: if forced_hoster: for hoster in available_hosters: if hoster.casefold() == forced_hoster.casefold(): selected_hoster = hoster break if selected_hoster is None and len(available_hosters) == 1: selected_hoster = available_hosters[0] elif selected_hoster is None: selected_index = xbmcgui.Dialog().select("Hoster waehlen", available_hosters) if selected_index is None or selected_index < 0: _log("Play abgebrochen (kein Hoster gewählt).", xbmc.LOGDEBUG) return selected_hoster = available_hosters[selected_index] # Manche Plugins erlauben (optional) eine temporaere Einschränkung auf einen Hoster. preferred_setter = getattr(plugin, "set_preferred_hosters", None) restore_hosters: list[str] | None = None if selected_hoster and callable(preferred_setter): current = getattr(plugin, "_preferred_hosters", None) if isinstance(current, list): restore_hosters = list(current) preferred_setter([selected_hoster]) try: link = plugin.stream_link_for(title, season, episode) if not link: _log("Kein Stream gefunden.", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Wiedergabe", "Kein Stream gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return _log(f"Stream-Link: {link}", xbmc.LOGDEBUG) resolved_link = plugin.resolve_stream_link(link) if not resolved_link: err = _resolveurl_last_error() if _is_cloudflare_challenge_error(err): _log(f"ResolveURL Cloudflare-Challenge: {err}", xbmc.LOGWARNING) xbmcgui.Dialog().notification( "Wiedergabe", "Hoster durch Cloudflare geschuetzt. Bitte spaeter erneut probieren.", xbmcgui.NOTIFICATION_INFO, 4500, ) return final_link = resolved_link or link final_link = normalize_resolved_stream_url(final_link, source_url=link) err = _resolveurl_last_error() if _is_cloudflare_challenge_error(err) and final_link.strip() == link.strip(): _log(f"ResolveURL Cloudflare-Challenge (unresolved): {err}", xbmc.LOGWARNING) xbmcgui.Dialog().notification( "Wiedergabe", "Hoster durch Cloudflare geschuetzt. Bitte spaeter erneut probieren.", xbmcgui.NOTIFICATION_INFO, 4500, ) return finally: if restore_hosters is not None and callable(preferred_setter): preferred_setter(restore_hosters) _log(f"Finaler Link: {final_link}", xbmc.LOGDEBUG) season_number = _extract_first_int(season) episode_number = _extract_first_int(episode) if season_number is not None and episode_number is not None: display_title = f"{title} - S{season_number:02d}E{episode_number:02d}" else: display_title = title info_labels, art, cast = _tmdb_labels_and_art(title) display_title = _label_with_duration(display_title, info_labels) _play_final_link( final_link, display_title=display_title, info_labels=info_labels, art=art, cast=cast, resolve_handle=resolve_handle, ) _track_playback_and_update_state_async( _playstate_key(plugin_name=plugin_name, title=title, season=season, episode=episode) ) def _play_episode_url( plugin_name: str, *, title: str, season_number: int, episode_number: int, episode_url: str, season_label_override: str = "", episode_label_override: str = "", resolve_handle: int | None = None, ) -> None: season_label = (season_label_override or "").strip() or (f"Staffel {season_number}" if season_number > 0 else "") episode_label = (episode_label_override or "").strip() or ( f"Episode {episode_number}" if episode_number > 0 else "" ) _log(f"Play (URL) anfordern: {plugin_name} / {title} / {season_label} / {episode_label} / {episode_url}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Wiedergabe", "Quelle nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return available_hosters: list[str] = [] hoster_getter = getattr(plugin, "available_hosters_for_url", None) if callable(hoster_getter): try: with _busy_dialog("Hoster werden geladen..."): available_hosters = list(hoster_getter(episode_url) or []) except Exception as exc: _log(f"Hoster laden fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) selected_hoster: str | None = None if available_hosters: if len(available_hosters) == 1: selected_hoster = available_hosters[0] else: selected_index = xbmcgui.Dialog().select("Hoster waehlen", available_hosters) if selected_index is None or selected_index < 0: _log("Play abgebrochen (kein Hoster gewählt).", xbmc.LOGDEBUG) return selected_hoster = available_hosters[selected_index] preferred_setter = getattr(plugin, "set_preferred_hosters", None) restore_hosters: list[str] | None = None if selected_hoster and callable(preferred_setter): current = getattr(plugin, "_preferred_hosters", None) if isinstance(current, list): restore_hosters = list(current) preferred_setter([selected_hoster]) try: link_getter = getattr(plugin, "stream_link_for_url", None) if not callable(link_getter): xbmcgui.Dialog().notification("Wiedergabe", "Diese Funktion wird von der Quelle nicht unterstuetzt.", xbmcgui.NOTIFICATION_INFO, 3000) return link = link_getter(episode_url) if not link: _log("Kein Stream gefunden.", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Wiedergabe", "Kein Stream gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return _log(f"Stream-Link: {link}", xbmc.LOGDEBUG) resolved_link = plugin.resolve_stream_link(link) if not resolved_link: err = _resolveurl_last_error() if _is_cloudflare_challenge_error(err): _log(f"ResolveURL Cloudflare-Challenge: {err}", xbmc.LOGWARNING) xbmcgui.Dialog().notification( "Wiedergabe", "Hoster durch Cloudflare geschuetzt. Bitte spaeter erneut probieren.", xbmcgui.NOTIFICATION_INFO, 4500, ) return final_link = resolved_link or link final_link = normalize_resolved_stream_url(final_link, source_url=link) err = _resolveurl_last_error() if _is_cloudflare_challenge_error(err) and final_link.strip() == link.strip(): _log(f"ResolveURL Cloudflare-Challenge (unresolved): {err}", xbmc.LOGWARNING) xbmcgui.Dialog().notification( "Wiedergabe", "Hoster durch Cloudflare geschuetzt. Bitte spaeter erneut probieren.", xbmcgui.NOTIFICATION_INFO, 4500, ) return finally: if restore_hosters is not None and callable(preferred_setter): preferred_setter(restore_hosters) display_title = f"{title} - S{season_number:02d}E{episode_number:02d}" if season_number and episode_number else title info_labels, art, cast = _tmdb_labels_and_art(title) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "episode") info_labels.setdefault("tvshowtitle", title) if season_number > 0: info_labels["season"] = str(season_number) if episode_number > 0: info_labels["episode"] = str(episode_number) display_title = _label_with_duration(display_title, info_labels) _play_final_link( final_link, display_title=display_title, info_labels=info_labels, art=art, cast=cast, resolve_handle=resolve_handle, ) _track_playback_and_update_state_async( _playstate_key(plugin_name=plugin_name, title=title, season=season_label, episode=episode_label) ) def _parse_params() -> dict[str, str]: """Parst Kodi-Plugin-Parameter aus `sys.argv[2]`.""" if len(sys.argv) <= 2 or not sys.argv[2]: return {} raw_params = parse_qs(sys.argv[2].lstrip("?"), keep_blank_values=True) return {key: values[0] for key, values in raw_params.items()} def run() -> None: params = _parse_params() action = params.get("action") _log(f"Action: {action}", xbmc.LOGDEBUG) _maybe_run_auto_update_check(action) if action == "search": _show_search() elif action == "plugin_menu": _show_plugin_menu(params.get("plugin", "")) elif action == "plugin_search": _show_plugin_search(params.get("plugin", "")) elif action == "genre_sources": _show_genre_sources() elif action == "genres": _show_genres(params.get("plugin", "")) elif action == "categories": _show_categories(params.get("plugin", "")) elif action == "latest_titles": _show_latest_titles( params.get("plugin", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "new_titles": _show_new_titles( params.get("plugin", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "latest_episodes": _show_latest_episodes( params.get("plugin", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "genre_series": _show_genre_series( params.get("plugin", ""), params.get("genre", ""), ) elif action == "genre_titles_page": _show_genre_titles_page( params.get("plugin", ""), params.get("genre", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "category_titles_page": _show_category_titles_page( params.get("plugin", ""), params.get("category", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "alpha_index": _show_alpha_index(params.get("plugin", "")) elif action == "alpha_titles_page": _show_alpha_titles_page( params.get("plugin", ""), params.get("letter", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "series_catalog": _show_series_catalog( params.get("plugin", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "genre_series_group": _show_genre_series_group( params.get("plugin", ""), params.get("genre", ""), params.get("group", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "popular": _show_popular( params.get("plugin") or None, _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "settings": _open_settings() elif action == "check_updates": _run_update_check() elif action == "apply_update_channel": _apply_update_channel() elif action == "select_update_version": _show_version_selector() elif action == "seasons": _show_seasons(params.get("plugin", ""), params.get("title", ""), params.get("series_url", "")) elif action == "episodes": _show_episodes( params.get("plugin", ""), params.get("title", ""), params.get("season", ""), params.get("series_url", ""), ) elif action == "play_episode": _play_episode( params.get("plugin", ""), params.get("title", ""), params.get("season", ""), params.get("episode", ""), forced_hoster=params.get("hoster", ""), episode_url=params.get("url", ""), series_url=params.get("series_url", ""), resolve_handle=_get_handle(), ) elif action == "play_movie": plugin_name = params.get("plugin", "") title = params.get("title", "") series_url = params.get("series_url", "") if series_url: plugin = _discover_plugins().get(plugin_name) remember_series_url = getattr(plugin, "remember_series_url", None) if plugin is not None else None if callable(remember_series_url): try: remember_series_url(title, series_url) except Exception: pass # Einschalten liefert Filme (keine Staffeln/Episoden). Für Playback nutzen wir: # -> Stream -> . if (plugin_name or "").casefold() == "einschalten": _play_episode( plugin_name, title, "Stream", title, resolve_handle=_get_handle(), ) else: _play_episode( plugin_name, title, "Film", "Stream", resolve_handle=_get_handle(), ) elif action == "play_episode_url": _play_episode_url( params.get("plugin", ""), title=params.get("title", ""), season_number=_parse_positive_int(params.get("season", "0"), default=0), episode_number=_parse_positive_int(params.get("episode", "0"), default=0), episode_url=params.get("url", ""), resolve_handle=_get_handle(), ) elif action == "play": link = params.get("url", "") if link: _play_final_link(link, resolve_handle=_get_handle()) else: _show_root_menu() if __name__ == "__main__": run()