#!/usr/bin/env python3 """ViewIt Kodi-Addon Einstiegspunkt. Dieses Modul ist der Router fuer die Kodi-Navigation: es rendert Menues, ruft Plugin-Implementierungen auf und startet die Wiedergabe. """ from __future__ import annotations import asyncio from contextlib import contextmanager from datetime import datetime import importlib.util import inspect import os import re import sys from pathlib import Path from types import ModuleType from urllib.parse import parse_qs, urlencode try: # pragma: no cover - Kodi runtime import xbmc # type: ignore[import-not-found] import xbmcaddon # type: ignore[import-not-found] import xbmcgui # type: ignore[import-not-found] import xbmcplugin # type: ignore[import-not-found] import xbmcvfs # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow importing outside Kodi (e.g. linting) xbmc = None xbmcaddon = None xbmcgui = None xbmcplugin = None xbmcvfs = None class _XbmcStub: LOGDEBUG = 0 LOGINFO = 1 LOGWARNING = 2 @staticmethod def log(message: str, level: int = 1) -> None: print(f"[KodiStub:{level}] {message}") class Player: def play(self, item: str, listitem: object | None = None) -> None: print(f"[KodiStub] play: {item}") class _XbmcGuiStub: INPUT_ALPHANUM = 0 NOTIFICATION_INFO = 0 class Dialog: def input(self, heading: str, type: int = 0) -> str: raise RuntimeError("xbmcgui ist nicht verfuegbar (KodiStub).") def select(self, heading: str, options: list[str]) -> int: raise RuntimeError("xbmcgui ist nicht verfuegbar (KodiStub).") def notification(self, heading: str, message: str, icon: int = 0, time: int = 0) -> None: print(f"[KodiStub] notification: {heading}: {message}") class ListItem: def __init__(self, label: str = "", path: str = "") -> None: self._label = label self._path = path def setInfo(self, type: str, infoLabels: dict[str, str]) -> None: return class _XbmcPluginStub: @staticmethod def addDirectoryItem(*, handle: int, url: str, listitem: object, isFolder: bool) -> None: print(f"[KodiStub] addDirectoryItem: {url}") @staticmethod def endOfDirectory(handle: int) -> None: print(f"[KodiStub] endOfDirectory: {handle}") @staticmethod def setPluginCategory(handle: int, category: str) -> None: print(f"[KodiStub] category: {category}") xbmc = _XbmcStub() xbmcgui = _XbmcGuiStub() xbmcplugin = _XbmcPluginStub() from plugin_interface import BasisPlugin from tmdb import TmdbCastMember, fetch_tv_episode_credits, lookup_movie, lookup_tv_season, lookup_tv_season_summary, lookup_tv_show PLUGIN_DIR = Path(__file__).with_name("plugins") _PLUGIN_CACHE: dict[str, BasisPlugin] | None = None _TMDB_CACHE: dict[str, tuple[dict[str, str], dict[str, str]]] = {} _TMDB_CAST_CACHE: dict[str, list[TmdbCastMember]] = {} _TMDB_ID_CACHE: dict[str, int] = {} _TMDB_SEASON_CACHE: dict[tuple[int, int, str, str], dict[int, tuple[dict[str, str], dict[str, str]]]] = {} _TMDB_SEASON_SUMMARY_CACHE: dict[tuple[int, int, str, str], tuple[dict[str, str], dict[str, str]]] = {} _TMDB_EPISODE_CAST_CACHE: dict[tuple[int, int, int, str], list[TmdbCastMember]] = {} _TMDB_LOG_PATH: str | None = None _GENRE_TITLES_CACHE: dict[tuple[str, str], list[str]] = {} _ADDON_INSTANCE = None _PLAYSTATE_CACHE: dict[str, dict[str, object]] | None = None WATCHED_THRESHOLD = 0.9 def _tmdb_prefetch_concurrency() -> int: """Max number of concurrent TMDB lookups when prefetching metadata for lists.""" try: raw = _get_setting_string("tmdb_prefetch_concurrency").strip() value = int(raw) if raw else 6 except Exception: value = 6 return max(1, min(20, value)) def _log(message: str, level: int = xbmc.LOGINFO) -> None: xbmc.log(f"[ViewIt] {message}", level) def _busy_open() -> None: try: # pragma: no cover - Kodi runtime if xbmc is not None and hasattr(xbmc, "executebuiltin"): xbmc.executebuiltin("ActivateWindow(busydialognocancel)") except Exception: pass def _busy_close() -> None: try: # pragma: no cover - Kodi runtime if xbmc is not None and hasattr(xbmc, "executebuiltin"): xbmc.executebuiltin("Dialog.Close(busydialognocancel)") xbmc.executebuiltin("Dialog.Close(busydialog)") except Exception: pass @contextmanager def _busy_dialog(): _busy_open() try: yield finally: _busy_close() def _get_handle() -> int: return int(sys.argv[1]) if len(sys.argv) > 1 else -1 def _set_content(handle: int, content: str) -> None: """Hint Kodi about the content type so skins can show watched/resume overlays.""" content = (content or "").strip() if not content: return try: # pragma: no cover - Kodi runtime setter = getattr(xbmcplugin, "setContent", None) if callable(setter): setter(handle, content) except Exception: pass def _get_addon(): global _ADDON_INSTANCE if xbmcaddon is None: return None if _ADDON_INSTANCE is None: _ADDON_INSTANCE = xbmcaddon.Addon() return _ADDON_INSTANCE def _playstate_key(*, plugin_name: str, title: str, season: str, episode: str) -> str: plugin_name = (plugin_name or "").strip() title = (title or "").strip() season = (season or "").strip() episode = (episode or "").strip() return f"{plugin_name}\t{title}\t{season}\t{episode}" def _playstate_path() -> str: return _get_log_path("playstate.json") def _load_playstate() -> dict[str, dict[str, object]]: global _PLAYSTATE_CACHE if _PLAYSTATE_CACHE is not None: return _PLAYSTATE_CACHE path = _playstate_path() try: if xbmcvfs and xbmcvfs.exists(path): handle = xbmcvfs.File(path) raw = handle.read() handle.close() else: with open(path, "r", encoding="utf-8") as handle: raw = handle.read() data = json.loads(raw or "{}") if isinstance(data, dict): normalized: dict[str, dict[str, object]] = {} for key, value in data.items(): if isinstance(key, str) and isinstance(value, dict): normalized[key] = dict(value) _PLAYSTATE_CACHE = normalized return normalized except Exception: pass _PLAYSTATE_CACHE = {} return {} def _save_playstate(state: dict[str, dict[str, object]]) -> None: global _PLAYSTATE_CACHE _PLAYSTATE_CACHE = state path = _playstate_path() try: payload = json.dumps(state, ensure_ascii=False, sort_keys=True) except Exception: return try: if xbmcvfs: directory = os.path.dirname(path) if directory and not xbmcvfs.exists(directory): xbmcvfs.mkdirs(directory) handle = xbmcvfs.File(path, "w") handle.write(payload) handle.close() else: with open(path, "w", encoding="utf-8") as handle: handle.write(payload) except Exception: return def _get_playstate(key: str) -> dict[str, object]: return dict(_load_playstate().get(key, {}) or {}) def _set_playstate(key: str, value: dict[str, object]) -> None: state = _load_playstate() if value: state[key] = dict(value) else: state.pop(key, None) _save_playstate(state) def _apply_playstate_to_info(info_labels: dict[str, object], playstate: dict[str, object]) -> dict[str, object]: info_labels = dict(info_labels or {}) watched = bool(playstate.get("watched") or False) resume_position = playstate.get("resume_position") resume_total = playstate.get("resume_total") if watched: info_labels["playcount"] = 1 info_labels.pop("resume_position", None) info_labels.pop("resume_total", None) else: try: pos = int(resume_position) if resume_position is not None else 0 tot = int(resume_total) if resume_total is not None else 0 except Exception: pos, tot = 0, 0 if pos > 0 and tot > 0: info_labels["resume_position"] = pos info_labels["resume_total"] = tot return info_labels def _time_label(seconds: int) -> str: try: seconds = int(seconds or 0) except Exception: seconds = 0 if seconds <= 0: return "" hours = seconds // 3600 minutes = (seconds % 3600) // 60 secs = seconds % 60 if hours > 0: return f"{hours:02d}:{minutes:02d}:{secs:02d}" return f"{minutes:02d}:{secs:02d}" def _label_with_playstate(label: str, playstate: dict[str, object]) -> str: watched = bool(playstate.get("watched") or False) if watched: return f"✓ {label}" resume_pos = playstate.get("resume_position") try: pos = int(resume_pos) if resume_pos is not None else 0 except Exception: pos = 0 if pos > 0: return f"↩ {_time_label(pos)} {label}" return label def _title_playstate(plugin_name: str, title: str) -> dict[str, object]: return _get_playstate(_playstate_key(plugin_name=plugin_name, title=title, season="", episode="")) def _season_playstate(plugin_name: str, title: str, season: str) -> dict[str, object]: return _get_playstate(_playstate_key(plugin_name=plugin_name, title=title, season=season, episode="")) def _get_setting_string(setting_id: str) -> str: if xbmcaddon is None: return "" addon = _get_addon() if addon is None: return "" getter = getattr(addon, "getSettingString", None) if callable(getter): try: return str(getter(setting_id) or "") except TypeError: return "" getter = getattr(addon, "getSetting", None) if callable(getter): try: return str(getter(setting_id) or "") except TypeError: return "" return "" def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool: if xbmcaddon is None: return default addon = _get_addon() if addon is None: return default getter = getattr(addon, "getSettingBool", None) if callable(getter): # Kodi kann für unbekannte Settings stillschweigend `False` liefern. # Damit neue Settings mit `default=True` korrekt funktionieren, prüfen wir auf leeren Raw-Value. raw_getter = getattr(addon, "getSetting", None) if callable(raw_getter): try: raw = str(raw_getter(setting_id) or "").strip() except TypeError: raw = "" if raw == "": return default try: return bool(getter(setting_id)) except TypeError: return default getter = getattr(addon, "getSetting", None) if callable(getter): try: raw = str(getter(setting_id) or "").strip().lower() except TypeError: return default if raw in {"true", "1", "yes", "on"}: return True if raw in {"false", "0", "no", "off"}: return False return default def _apply_video_info(item, info_labels: dict[str, object] | None, cast: list[TmdbCastMember] | None) -> None: """Setzt Metadaten bevorzugt via InfoTagVideo (Kodi v20+), mit Fallback auf deprecated APIs.""" if not info_labels and not cast: return info_labels = dict(info_labels or {}) get_tag = getattr(item, "getVideoInfoTag", None) tag = None if callable(get_tag): try: tag = get_tag() except Exception: tag = None if tag is not None: try: title = info_labels.get("title") or "" plot = info_labels.get("plot") or "" mediatype = info_labels.get("mediatype") or "" tvshowtitle = info_labels.get("tvshowtitle") or "" season = info_labels.get("season") episode = info_labels.get("episode") rating = info_labels.get("rating") votes = info_labels.get("votes") duration = info_labels.get("duration") playcount = info_labels.get("playcount") resume_position = info_labels.get("resume_position") resume_total = info_labels.get("resume_total") setter = getattr(tag, "setTitle", None) if callable(setter) and title: setter(str(title)) setter = getattr(tag, "setPlot", None) if callable(setter) and plot: setter(str(plot)) setter = getattr(tag, "setMediaType", None) if callable(setter) and mediatype: setter(str(mediatype)) setter = getattr(tag, "setTvShowTitle", None) if callable(setter) and tvshowtitle: setter(str(tvshowtitle)) setter = getattr(tag, "setSeason", None) if callable(setter) and season not in (None, "", 0, "0"): setter(int(season)) # type: ignore[arg-type] setter = getattr(tag, "setEpisode", None) if callable(setter) and episode not in (None, "", 0, "0"): setter(int(episode)) # type: ignore[arg-type] if rating not in (None, "", 0, "0"): try: rating_f = float(rating) # type: ignore[arg-type] except Exception: rating_f = 0.0 if rating_f: set_rating = getattr(tag, "setRating", None) if callable(set_rating): try: if votes not in (None, "", 0, "0"): set_rating(rating_f, int(votes), "tmdb") # type: ignore[misc] else: set_rating(rating_f) # type: ignore[misc] except Exception: try: set_rating(rating_f, int(votes or 0), "tmdb", True) # type: ignore[misc] except Exception: pass if duration not in (None, "", 0, "0"): try: duration_i = int(duration) # type: ignore[arg-type] except Exception: duration_i = 0 if duration_i: set_duration = getattr(tag, "setDuration", None) if callable(set_duration): try: set_duration(duration_i) except Exception: pass if playcount not in (None, "", 0, "0"): try: playcount_i = int(playcount) # type: ignore[arg-type] except Exception: playcount_i = 0 if playcount_i: set_playcount = getattr(tag, "setPlaycount", None) if callable(set_playcount): try: set_playcount(playcount_i) except Exception: pass try: pos = int(resume_position) if resume_position is not None else 0 tot = int(resume_total) if resume_total is not None else 0 except Exception: pos, tot = 0, 0 if pos > 0 and tot > 0: set_resume = getattr(tag, "setResumePoint", None) if callable(set_resume): try: set_resume(pos, tot) except Exception: try: set_resume(pos) # type: ignore[misc] except Exception: pass if cast: set_cast = getattr(tag, "setCast", None) actor_cls = getattr(xbmc, "Actor", None) if callable(set_cast) and actor_cls is not None: actors = [] for index, member in enumerate(cast[:30]): try: actors.append(actor_cls(member.name, member.role, index, member.thumb)) except Exception: try: actors.append(actor_cls(member.name, member.role)) except Exception: continue try: set_cast(actors) except Exception: pass elif callable(set_cast): cast_dicts = [ {"name": m.name, "role": m.role, "thumbnail": m.thumb} for m in cast[:30] if m.name ] try: set_cast(cast_dicts) except Exception: pass return except Exception: # Fallback below pass # Deprecated fallback for older Kodi. try: item.setInfo("video", info_labels) # type: ignore[arg-type] except Exception: pass if cast: set_cast = getattr(item, "setCast", None) if callable(set_cast): try: set_cast([m.name for m in cast[:30] if m.name]) except Exception: pass def _get_log_path(filename: str) -> str: if xbmcaddon and xbmcvfs: addon = xbmcaddon.Addon() profile = xbmcvfs.translatePath(addon.getAddonInfo("profile")) log_dir = os.path.join(profile, "logs") if not xbmcvfs.exists(log_dir): xbmcvfs.mkdirs(log_dir) return os.path.join(log_dir, filename) return os.path.join(os.path.dirname(__file__), filename) def _tmdb_file_log(message: str) -> None: global _TMDB_LOG_PATH if _TMDB_LOG_PATH is None: _TMDB_LOG_PATH = _get_log_path("tmdb.log") timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z" line = f"{timestamp}\t{message}\n" try: with open(_TMDB_LOG_PATH, "a", encoding="utf-8") as handle: handle.write(line) except Exception: if xbmcvfs is None: return try: handle = xbmcvfs.File(_TMDB_LOG_PATH, "a") handle.write(line) handle.close() except Exception: return def _tmdb_labels_and_art(title: str) -> tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]: title_key = (title or "").strip().casefold() language = _get_setting_string("tmdb_language").strip() or "de-DE" show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) show_fanart = _get_setting_bool("tmdb_show_fanart", default=True) show_rating = _get_setting_bool("tmdb_show_rating", default=True) show_votes = _get_setting_bool("tmdb_show_votes", default=False) show_cast = _get_setting_bool("tmdb_show_cast", default=False) flags = f"p{int(show_plot)}a{int(show_art)}f{int(show_fanart)}r{int(show_rating)}v{int(show_votes)}c{int(show_cast)}" cache_key = f"{language}|{flags}|{title_key}" cached = _TMDB_CACHE.get(cache_key) if cached is not None: info, art = cached # Cast wird nicht in _TMDB_CACHE gehalten (weil es ListItem.setCast betrifft), daher separat cachen: cast_cached = _TMDB_CAST_CACHE.get(cache_key, []) return info, art, list(cast_cached) info_labels: dict[str, str] = {"title": title} art: dict[str, str] = {} cast: list[TmdbCastMember] = [] query = (title or "").strip() api_key = _get_setting_string("tmdb_api_key").strip() log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) if api_key: try: log_fn = _tmdb_file_log if (log_requests or log_responses) else None # Einige Plugins liefern Titel wie "… – Der Film". Für TMDB ist oft der Basistitel besser. candidates: list[str] = [] if query: candidates.append(query) simplified = re.sub(r"\s*[-–]\s*der\s+film\s*$", "", query, flags=re.IGNORECASE).strip() if simplified and simplified not in candidates: candidates.append(simplified) meta = None is_tv = False for candidate in candidates: meta = lookup_tv_show( title=candidate, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, include_cast=show_cast, ) if meta: is_tv = True break if not meta: for candidate in candidates: movie = lookup_movie( title=candidate, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, include_cast=show_cast, ) if movie: meta = movie break except Exception as exc: try: _tmdb_file_log(f"TMDB ERROR lookup_failed title={title!r} error={exc!r}") except Exception: pass _log(f"TMDB Meta fehlgeschlagen: {exc}", xbmc.LOGDEBUG) meta = None if meta: # Nur TV-IDs cachen (für Staffel-/Episoden-Lookups); Movie-IDs würden dort fehlschlagen. if is_tv: _TMDB_ID_CACHE[title_key] = int(getattr(meta, "tmdb_id", 0) or 0) info_labels.setdefault("mediatype", "tvshow") else: info_labels.setdefault("mediatype", "movie") if show_plot and getattr(meta, "plot", ""): info_labels["plot"] = getattr(meta, "plot", "") runtime_minutes = int(getattr(meta, "runtime_minutes", 0) or 0) if runtime_minutes > 0 and not is_tv: info_labels["duration"] = str(runtime_minutes * 60) rating = getattr(meta, "rating", 0.0) or 0.0 votes = getattr(meta, "votes", 0) or 0 if show_rating and rating: # Kodi akzeptiert je nach Version float oder string; wir bleiben bei strings wie im restlichen Code. info_labels["rating"] = str(rating) if show_votes and votes: info_labels["votes"] = str(votes) if show_art and getattr(meta, "poster", ""): poster = getattr(meta, "poster", "") art.update({"thumb": poster, "poster": poster, "icon": poster}) if show_fanart and getattr(meta, "fanart", ""): fanart = getattr(meta, "fanart", "") if fanart: art.update({"fanart": fanart, "landscape": fanart}) if show_cast: cast = list(getattr(meta, "cast", []) or []) elif log_requests or log_responses: _tmdb_file_log(f"TMDB MISS title={title!r}") _TMDB_CACHE[cache_key] = (info_labels, art) _TMDB_CAST_CACHE[cache_key] = list(cast) return info_labels, art, list(cast) async def _tmdb_labels_and_art_bulk_async( titles: list[str], ) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]]: titles = [str(t).strip() for t in (titles or []) if t and str(t).strip()] if not titles: return {} unique_titles: list[str] = list(dict.fromkeys(titles)) limit = _tmdb_prefetch_concurrency() semaphore = asyncio.Semaphore(limit) async def fetch_one(title: str): async with semaphore: return title, await asyncio.to_thread(_tmdb_labels_and_art, title) tasks = [fetch_one(title) for title in unique_titles] results = await asyncio.gather(*tasks, return_exceptions=True) mapped: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} for entry in results: if isinstance(entry, Exception): continue try: title, payload = entry except Exception: continue if isinstance(title, str) and isinstance(payload, tuple) and len(payload) == 3: mapped[title] = payload # type: ignore[assignment] return mapped def _tmdb_labels_and_art_bulk( titles: list[str], ) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]]: return _run_async(_tmdb_labels_and_art_bulk_async(titles)) def _tmdb_episode_labels_and_art(*, title: str, season_label: str, episode_label: str) -> tuple[dict[str, str], dict[str, str]]: title_key = (title or "").strip().casefold() tmdb_id = _TMDB_ID_CACHE.get(title_key) if not tmdb_id: _tmdb_labels_and_art(title) tmdb_id = _TMDB_ID_CACHE.get(title_key) if not tmdb_id: return {"title": episode_label}, {} season_number = _extract_first_int(season_label) episode_number = _extract_first_int(episode_label) if season_number is None or episode_number is None: return {"title": episode_label}, {} language = _get_setting_string("tmdb_language").strip() or "de-DE" show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) flags = f"p{int(show_plot)}a{int(show_art)}" season_key = (tmdb_id, season_number, language, flags) cached_season = _TMDB_SEASON_CACHE.get(season_key) if cached_season is None: api_key = _get_setting_string("tmdb_api_key").strip() if not api_key: return {"title": episode_label}, {} log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) log_fn = _tmdb_file_log if (log_requests or log_responses) else None try: season_meta = lookup_tv_season( tmdb_id=tmdb_id, season_number=season_number, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, ) except Exception as exc: if log_fn: log_fn(f"TMDB ERROR season_lookup_failed tmdb_id={tmdb_id} season={season_number} error={exc!r}") season_meta = None mapped: dict[int, tuple[dict[str, str], dict[str, str]]] = {} if season_meta: for ep_no, ep in season_meta.items(): info: dict[str, str] = {"title": f"Episode {ep_no}"} if show_plot and ep.plot: info["plot"] = ep.plot if getattr(ep, "runtime_minutes", 0): info["duration"] = str(int(getattr(ep, "runtime_minutes", 0)) * 60) art: dict[str, str] = {} if show_art and ep.thumb: art = {"thumb": ep.thumb} mapped[ep_no] = (info, art) _TMDB_SEASON_CACHE[season_key] = mapped cached_season = mapped return cached_season.get(episode_number, ({"title": episode_label}, {})) def _tmdb_episode_cast(*, title: str, season_label: str, episode_label: str) -> list[TmdbCastMember]: show_episode_cast = _get_setting_bool("tmdb_show_episode_cast", default=False) if not show_episode_cast: return [] title_key = (title or "").strip().casefold() tmdb_id = _TMDB_ID_CACHE.get(title_key) if not tmdb_id: _tmdb_labels_and_art(title) tmdb_id = _TMDB_ID_CACHE.get(title_key) if not tmdb_id: return [] season_number = _extract_first_int(season_label) episode_number = _extract_first_int(episode_label) if season_number is None or episode_number is None: return [] language = _get_setting_string("tmdb_language").strip() or "de-DE" cache_key = (tmdb_id, season_number, episode_number, language) cached = _TMDB_EPISODE_CAST_CACHE.get(cache_key) if cached is not None: return list(cached) api_key = _get_setting_string("tmdb_api_key").strip() if not api_key: _TMDB_EPISODE_CAST_CACHE[cache_key] = [] return [] log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) log_fn = _tmdb_file_log if (log_requests or log_responses) else None try: cast = fetch_tv_episode_credits( tmdb_id=tmdb_id, season_number=season_number, episode_number=episode_number, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, ) except Exception as exc: if log_fn: log_fn( f"TMDB ERROR episode_credits_failed tmdb_id={tmdb_id} season={season_number} episode={episode_number} error={exc!r}" ) cast = [] _TMDB_EPISODE_CAST_CACHE[cache_key] = list(cast) return list(cast) def _add_directory_item( handle: int, label: str, action: str, params: dict[str, str] | None = None, *, is_folder: bool = True, info_labels: dict[str, str] | None = None, art: dict[str, str] | None = None, cast: list[TmdbCastMember] | None = None, ) -> None: """Fuegt einen Eintrag (Folder oder Playable) in die Kodi-Liste ein.""" query: dict[str, str] = {"action": action} if params: query.update(params) url = f"{sys.argv[0]}?{urlencode(query)}" item = xbmcgui.ListItem(label=label) if not is_folder: try: item.setProperty("IsPlayable", "true") except Exception: pass _apply_video_info(item, info_labels, cast) if art: setter = getattr(item, "setArt", None) if callable(setter): try: setter(art) except Exception: pass xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder) def _show_root_menu() -> None: handle = _get_handle() _log("Root-Menue wird angezeigt.") _add_directory_item(handle, "Globale Suche", "search") plugins = _discover_plugins() for plugin_name in sorted(plugins.keys(), key=lambda value: value.casefold()): display = f"{plugin_name}" _add_directory_item(handle, display, "plugin_menu", {"plugin": plugin_name}, is_folder=True) _add_directory_item(handle, "Einstellungen", "settings") xbmcplugin.endOfDirectory(handle) def _show_plugin_menu(plugin_name: str) -> None: handle = _get_handle() plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification("Plugin", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, plugin_name) _add_directory_item(handle, "Suche", "plugin_search", {"plugin": plugin_name}, is_folder=True) if _plugin_has_capability(plugin, "new_titles"): _add_directory_item(handle, "Neue Titel", "new_titles", {"plugin": plugin_name, "page": "1"}, is_folder=True) if _plugin_has_capability(plugin, "latest_episodes"): _add_directory_item(handle, "Neueste Folgen", "latest_episodes", {"plugin": plugin_name, "page": "1"}, is_folder=True) if _plugin_has_capability(plugin, "genres"): _add_directory_item(handle, "Genres", "genres", {"plugin": plugin_name}, is_folder=True) if _plugin_has_capability(plugin, "popular_series"): _add_directory_item(handle, "Meist gesehen", "popular", {"plugin": plugin_name, "page": "1"}, is_folder=True) xbmcplugin.endOfDirectory(handle) def _show_plugin_search(plugin_name: str) -> None: plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification("Suche", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) _show_root_menu() return _log(f"Plugin-Suche gestartet: {plugin_name}") dialog = xbmcgui.Dialog() query = dialog.input(f"{plugin_name}: Titel eingeben", type=xbmcgui.INPUT_ALPHANUM).strip() if not query: _log("Plugin-Suche abgebrochen (leere Eingabe).", xbmc.LOGDEBUG) _show_plugin_menu(plugin_name) return _log(f"Plugin-Suchbegriff ({plugin_name}): {query}", xbmc.LOGDEBUG) _show_plugin_search_results(plugin_name, query) def _show_plugin_search_results(plugin_name: str, query: str) -> None: handle = _get_handle() plugin_name = (plugin_name or "").strip() query = (query or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification("Suche", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, f"{plugin_name}: {query}") _set_content(handle, "movies" if plugin_name.casefold() == "einschalten" else "tvshows") _log(f"Suche nach Titeln (Plugin={plugin_name}): {query}") try: results = _run_async(plugin.search_titles(query)) except Exception as exc: _log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Suche", "Suche fehlgeschlagen.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return results = [str(t).strip() for t in (results or []) if t and str(t).strip()] results.sort(key=lambda value: value.casefold()) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} if results: with _busy_dialog(): tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results)) for title in results: info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) merged_info = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool(plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False)) _add_directory_item( handle, display_label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title}, is_folder=not direct_play, info_labels=merged_info, art=art, cast=cast, ) xbmcplugin.endOfDirectory(handle) def _import_plugin_module(path: Path) -> ModuleType: spec = importlib.util.spec_from_file_location(path.stem, path) if spec is None or spec.loader is None: raise ImportError(f"Modul-Spezifikation fuer {path.name} fehlt.") module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module try: spec.loader.exec_module(module) except Exception: sys.modules.pop(spec.name, None) raise return module def _discover_plugins() -> dict[str, BasisPlugin]: """Laedt alle Plugins aus `plugins/*.py` und cached Instanzen im RAM.""" global _PLUGIN_CACHE if _PLUGIN_CACHE is not None: return _PLUGIN_CACHE # Plugins werden dynamisch aus `plugins/*.py` geladen, damit Integrationen getrennt # entwickelt und bei Fehlern isoliert deaktiviert werden koennen. plugins: dict[str, BasisPlugin] = {} if not PLUGIN_DIR.exists(): _PLUGIN_CACHE = plugins return plugins for file_path in sorted(PLUGIN_DIR.glob("*.py")): if file_path.name.startswith("_"): continue try: module = _import_plugin_module(file_path) except Exception as exc: xbmc.log(f"Plugin-Datei {file_path.name} konnte nicht geladen werden: {exc}", xbmc.LOGWARNING) continue plugin_classes = [ obj for obj in module.__dict__.values() if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin ] for cls in plugin_classes: try: instance = cls() except Exception as exc: xbmc.log(f"Plugin {cls.__name__} konnte nicht geladen werden: {exc}", xbmc.LOGWARNING) continue if getattr(instance, "is_available", True) is False: reason = getattr(instance, "unavailable_reason", "Nicht verfuegbar.") xbmc.log(f"Plugin {cls.__name__} deaktiviert: {reason}", xbmc.LOGWARNING) continue plugins[instance.name] = instance _PLUGIN_CACHE = plugins return plugins def _run_async(coro): """Fuehrt eine Coroutine aus, auch wenn Kodi bereits einen Event-Loop hat.""" try: loop = asyncio.get_event_loop() except RuntimeError: loop = None if loop and loop.is_running(): temp_loop = asyncio.new_event_loop() try: return temp_loop.run_until_complete(coro) finally: temp_loop.close() return asyncio.run(coro) def _show_search() -> None: _log("Suche gestartet.") dialog = xbmcgui.Dialog() query = dialog.input("Serientitel eingeben", type=xbmcgui.INPUT_ALPHANUM).strip() if not query: _log("Suche abgebrochen (leere Eingabe).", xbmc.LOGDEBUG) _show_root_menu() return _log(f"Suchbegriff: {query}", xbmc.LOGDEBUG) _show_search_results(query) def _show_search_results(query: str) -> None: handle = _get_handle() _log(f"Suche nach Titeln: {query}") _set_content(handle, "tvshows") plugins = _discover_plugins() if not plugins: xbmcgui.Dialog().notification("Suche", "Keine Plugins gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for plugin_name, plugin in plugins.items(): try: results = _run_async(plugin.search_titles(query)) except Exception as exc: _log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) continue _log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG) tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} if results: with _busy_dialog(): tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results)) for title in results: info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) merged_info = _apply_playstate_to_info(dict(info_labels), playstate) label = _label_with_duration(title, info_labels) label = _label_with_playstate(label, playstate) label = f"{label} [{plugin_name}]" direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title}, is_folder=not direct_play, info_labels=merged_info, art=art, cast=cast, ) xbmcplugin.endOfDirectory(handle) def _show_seasons(plugin_name: str, title: str) -> None: handle = _get_handle() _log(f"Staffeln laden: {plugin_name} / {title}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Staffeln", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return # Einschalten liefert Filme. Für Playback soll nach dem Öffnen des Titels direkt ein # einzelnes abspielbares Item angezeigt werden: -> ( abspielbar). # Wichtig: ohne zusätzliche Netzwerkanfragen (sonst bleibt Kodi ggf. im Busy-Spinner hängen). if (plugin_name or "").casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False): xbmcplugin.setPluginCategory(handle, title) _set_content(handle, "movies") playstate = _title_playstate(plugin_name, title) info_labels: dict[str, object] = {"title": title, "mediatype": "movie"} info_labels = _apply_playstate_to_info(info_labels, playstate) display_label = _label_with_playstate(title, playstate) _add_directory_item( handle, display_label, "play_movie", {"plugin": plugin_name, "title": title}, is_folder=False, info_labels=info_labels, ) xbmcplugin.endOfDirectory(handle) return # Optional: Plugins können schnell (ohne Detail-Request) sagen, ob ein Titel ein Film ist. # Dann zeigen wir direkt ein einzelnes abspielbares Item: -> (). is_movie = getattr(plugin, "is_movie", None) if callable(is_movie): try: if bool(is_movie(title)): xbmcplugin.setPluginCategory(handle, title) _set_content(handle, "movies") playstate = _title_playstate(plugin_name, title) info_labels: dict[str, object] = {"title": title, "mediatype": "movie"} info_labels = _apply_playstate_to_info(info_labels, playstate) display_label = _label_with_playstate(title, playstate) _add_directory_item( handle, display_label, "play_movie", {"plugin": plugin_name, "title": title}, is_folder=False, info_labels=info_labels, ) xbmcplugin.endOfDirectory(handle) return except Exception: pass title_info_labels: dict[str, str] | None = None title_art: dict[str, str] | None = None title_cast: list[TmdbCastMember] | None = None meta_getter = getattr(plugin, "metadata_for", None) if callable(meta_getter): try: with _busy_dialog(): meta_labels, meta_art, meta_cast = meta_getter(title) if isinstance(meta_labels, dict): title_info_labels = {str(k): str(v) for k, v in meta_labels.items() if v} if isinstance(meta_art, dict): title_art = {str(k): str(v) for k, v in meta_art.items() if v} if isinstance(meta_cast, list): # type: ignore[assignment] - plugins may return cast in their own shape; best-effort only title_cast = meta_cast # noqa: PGH003 except Exception: pass try: seasons = plugin.seasons_for(title) except Exception as exc: _log(f"Staffeln laden fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Staffeln", "Konnte Staffeln nicht laden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return count = len(seasons) suffix = "Staffel" if count == 1 else "Staffeln" xbmcplugin.setPluginCategory(handle, f"{title} ({count} {suffix})") _set_content(handle, "seasons") # Staffel-Metadaten (Plot/Poster) optional via TMDB. _tmdb_labels_and_art(title) api_key = _get_setting_string("tmdb_api_key").strip() language = _get_setting_string("tmdb_language").strip() or "de-DE" show_plot = _get_setting_bool("tmdb_show_plot", default=True) show_art = _get_setting_bool("tmdb_show_art", default=True) flags = f"p{int(show_plot)}a{int(show_art)}" log_requests = _get_setting_bool("tmdb_log_requests", default=False) log_responses = _get_setting_bool("tmdb_log_responses", default=False) log_fn = _tmdb_file_log if (log_requests or log_responses) else None for season in seasons: info_labels: dict[str, str] | None = None art: dict[str, str] | None = None season_number = _extract_first_int(season) if api_key and season_number is not None: cache_key = (_TMDB_ID_CACHE.get((title or "").strip().casefold(), 0), season_number, language, flags) cached = _TMDB_SEASON_SUMMARY_CACHE.get(cache_key) if cached is None and cache_key[0]: try: meta = lookup_tv_season_summary( tmdb_id=cache_key[0], season_number=season_number, api_key=api_key, language=language, log=log_fn, log_responses=log_responses, ) except Exception as exc: if log_fn: log_fn(f"TMDB ERROR season_summary_failed tmdb_id={cache_key[0]} season={season_number} error={exc!r}") meta = None labels = {"title": season} art_map: dict[str, str] = {} if meta: if show_plot and meta.plot: labels["plot"] = meta.plot if show_art and meta.poster: art_map = {"thumb": meta.poster, "poster": meta.poster} cached = (labels, art_map) _TMDB_SEASON_SUMMARY_CACHE[cache_key] = cached if cached is not None: info_labels, art = cached merged_labels = dict(info_labels or {}) if title_info_labels: merged_labels = dict(title_info_labels) merged_labels.update(dict(info_labels or {})) season_state = _season_playstate(plugin_name, title, season) merged_labels = _apply_playstate_to_info(dict(merged_labels), season_state) merged_art: dict[str, str] | None = art if title_art: merged_art = dict(title_art) if isinstance(art, dict): merged_art.update({k: str(v) for k, v in art.items() if v}) _add_directory_item( handle, _label_with_playstate(season, season_state), "episodes", {"plugin": plugin_name, "title": title, "season": season}, is_folder=True, info_labels=merged_labels or None, art=merged_art, cast=title_cast, ) xbmcplugin.endOfDirectory(handle) def _show_episodes(plugin_name: str, title: str, season: str) -> None: handle = _get_handle() _log(f"Episoden laden: {plugin_name} / {title} / {season}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Episoden", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return season_number = _extract_first_int(season) if season_number is not None: xbmcplugin.setPluginCategory(handle, f"{title} - Staffel {season_number}") else: xbmcplugin.setPluginCategory(handle, f"{title} - {season}") _set_content(handle, "episodes") episodes = list(plugin.episodes_for(title, season)) if episodes: show_info, show_art, show_cast = _tmdb_labels_and_art(title) show_fanart = (show_art or {}).get("fanart") if isinstance(show_art, dict) else "" show_poster = (show_art or {}).get("poster") if isinstance(show_art, dict) else "" with _busy_dialog(): for episode in episodes: info_labels, art = _tmdb_episode_labels_and_art(title=title, season_label=season, episode_label=episode) episode_cast = _tmdb_episode_cast(title=title, season_label=season, episode_label=episode) merged_info = dict(show_info or {}) merged_info.update(dict(info_labels or {})) merged_art: dict[str, str] = {} if isinstance(show_art, dict): merged_art.update({k: str(v) for k, v in show_art.items() if v}) if isinstance(art, dict): merged_art.update({k: str(v) for k, v in art.items() if v}) # Kodi Info-Dialog für Episoden hängt oft an diesen Feldern. season_number = _extract_first_int(season) or 0 episode_number = _extract_first_int(episode) or 0 merged_info.setdefault("mediatype", "episode") merged_info.setdefault("tvshowtitle", title) if season_number: merged_info.setdefault("season", str(season_number)) if episode_number: merged_info.setdefault("episode", str(episode_number)) # Episode-Items ohne eigenes Artwork: Fanart/Poster vom Titel durchreichen. if show_fanart: merged_art.setdefault("fanart", show_fanart) merged_art.setdefault("landscape", show_fanart) if show_poster: merged_art.setdefault("poster", show_poster) key = _playstate_key(plugin_name=plugin_name, title=title, season=season, episode=episode) merged_info = _apply_playstate_to_info(merged_info, _get_playstate(key)) display_label = episode _add_directory_item( handle, display_label, "play_episode", {"plugin": plugin_name, "title": title, "season": season, "episode": episode}, is_folder=False, info_labels=merged_info, art=merged_art, cast=episode_cast or show_cast, ) xbmcplugin.endOfDirectory(handle) def _show_genre_sources() -> None: handle = _get_handle() _log("Genre-Quellen laden.") plugins = _discover_plugins() sources: list[tuple[str, BasisPlugin]] = [] for plugin_name, plugin in plugins.items(): if plugin.__class__.genres is BasisPlugin.genres: continue if plugin.__class__.titles_for_genre is BasisPlugin.titles_for_genre: continue sources.append((plugin_name, plugin)) if not sources: xbmcgui.Dialog().notification("Genres", "Keine Genre-Quellen gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for plugin_name, plugin in sources: _add_directory_item( handle, f"Genres [{plugin_name}]", "genres", {"plugin": plugin_name}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_genres(plugin_name: str) -> None: handle = _get_handle() _log(f"Genres laden: {plugin_name}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Genres", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return try: genres = plugin.genres() except Exception as exc: _log(f"Genres konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Genres", "Genres konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for genre in genres: # Wenn Plugin Paging unterstützt, direkt paginierte Titelliste öffnen. paging_getter = getattr(plugin, "titles_for_genre_page", None) if callable(paging_getter): _add_directory_item( handle, genre, "genre_titles_page", {"plugin": plugin_name, "genre": genre, "page": "1"}, is_folder=True, ) continue _add_directory_item( handle, genre, "genre_series", {"plugin": plugin_name, "genre": genre}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None: handle = _get_handle() plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Genres", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return page = max(1, int(page or 1)) paging_getter = getattr(plugin, "titles_for_genre_page", None) if not callable(paging_getter): xbmcgui.Dialog().notification("Genres", "Paging nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return total_pages = None count_getter = getattr(plugin, "genre_page_count", None) if callable(count_getter): try: total_pages = int(count_getter(genre) or 1) except Exception: total_pages = None if total_pages is not None: page = min(page, max(1, total_pages)) xbmcplugin.setPluginCategory(handle, f"{genre} ({page}/{total_pages})") else: xbmcplugin.setPluginCategory(handle, f"{genre} ({page})") _set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows") if page > 1: _add_directory_item( handle, "Vorherige Seite", "genre_titles_page", {"plugin": plugin_name, "genre": genre, "page": str(page - 1)}, is_folder=True, ) try: titles = list(paging_getter(genre, page) or []) except Exception as exc: _log(f"Genre-Seite konnte nicht geladen werden ({plugin_name}/{genre} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Genres", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False) if titles: if show_tmdb: with _busy_dialog(): tmdb_prefetched = _tmdb_labels_and_art_bulk(titles) for title in titles: info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, display_label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title}, is_folder=not direct_play, info_labels=info_labels, art=art, cast=cast, ) else: for title in titles: playstate = _title_playstate(plugin_name, title) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, _label_with_playstate(title, playstate), "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title}, is_folder=not direct_play, info_labels=_apply_playstate_to_info({"title": title}, playstate), ) show_next = False if total_pages is not None: show_next = page < total_pages else: has_more_getter = getattr(plugin, "genre_has_more", None) if callable(has_more_getter): try: show_next = bool(has_more_getter(genre, page)) except Exception: show_next = False if show_next: _add_directory_item( handle, "Nächste Seite", "genre_titles_page", {"plugin": plugin_name, "genre": genre, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _title_group_key(title: str) -> str: raw = (title or "").strip() if not raw: return "#" for char in raw: if char.isdigit(): return "0-9" if char.isalpha(): normalized = char.casefold() if normalized == "ä": normalized = "a" elif normalized == "ö": normalized = "o" elif normalized == "ü": normalized = "u" elif normalized == "ß": normalized = "s" return normalized.upper() return "#" def _genre_title_groups() -> list[tuple[str, str]]: return [ ("A-E", "A-E"), ("F-J", "F-J"), ("K-O", "K-O"), ("P-T", "P-T"), ("U-Z", "U-Z"), ("0-9", "0-9"), ] def _group_matches(group_code: str, title: str) -> bool: key = _title_group_key(title) if group_code == "0-9": return key == "0-9" if key == "0-9" or key == "#": return False if group_code == "A-E": return "A" <= key <= "E" if group_code == "F-J": return "F" <= key <= "J" if group_code == "K-O": return "K" <= key <= "O" if group_code == "P-T": return "P" <= key <= "T" if group_code == "U-Z": return "U" <= key <= "Z" return False def _get_genre_titles(plugin_name: str, genre: str) -> list[str]: cache_key = (plugin_name, genre) cached = _GENRE_TITLES_CACHE.get(cache_key) if cached is not None: return list(cached) plugin = _discover_plugins().get(plugin_name) if plugin is None: return [] titles = plugin.titles_for_genre(genre) titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) _GENRE_TITLES_CACHE[cache_key] = list(titles) return list(titles) def _show_genre_series(plugin_name: str, genre: str) -> None: handle = _get_handle() xbmcplugin.setPluginCategory(handle, genre) for label, group_code in _genre_title_groups(): _add_directory_item( handle, label, "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _parse_positive_int(value: str, *, default: int = 1) -> int: try: parsed = int(str(value or "").strip()) except Exception: return default return parsed if parsed > 0 else default def _popular_genre_label(plugin: BasisPlugin) -> str | None: label = getattr(plugin, "POPULAR_GENRE_LABEL", None) if isinstance(label, str) and label.strip(): return label.strip() return None def _plugin_has_capability(plugin: BasisPlugin, capability: str) -> bool: getter = getattr(plugin, "capabilities", None) if callable(getter): try: capabilities = getter() except Exception: capabilities = set() try: return capability in set(capabilities or []) except Exception: return False # Backwards compatibility: Popular via POPULAR_GENRE_LABEL constant. if capability == "popular_series": return _popular_genre_label(plugin) is not None return False def _plugins_with_popular() -> list[tuple[str, BasisPlugin, str]]: results: list[tuple[str, BasisPlugin, str]] = [] for plugin_name, plugin in _discover_plugins().items(): if not _plugin_has_capability(plugin, "popular_series"): continue label = _popular_genre_label(plugin) or "" results.append((plugin_name, plugin, label)) return results def _show_popular(plugin_name: str | None = None, page: int = 1) -> None: handle = _get_handle() page_size = 10 page = max(1, int(page or 1)) if plugin_name: plugin = _discover_plugins().get(plugin_name) if plugin is None or not _plugin_has_capability(plugin, "popular_series"): xbmcgui.Dialog().notification("Beliebte Serien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return try: popular_getter = getattr(plugin, "popular_series", None) if callable(popular_getter): titles = list(popular_getter() or []) else: label = _popular_genre_label(plugin) if not label: titles = [] else: titles = list(plugin.titles_for_genre(label) or []) except Exception as exc: _log(f"Beliebte Serien konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Beliebte Serien", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) total = len(titles) total_pages = max(1, (total + page_size - 1) // page_size) page = min(page, total_pages) xbmcplugin.setPluginCategory(handle, f"Beliebte Serien [{plugin_name}] ({page}/{total_pages})") _set_content(handle, "tvshows") if total_pages > 1 and page > 1: _add_directory_item( handle, "Vorherige Seite", "popular", {"plugin": plugin_name, "page": str(page - 1)}, is_folder=True, ) start = (page - 1) * page_size end = start + page_size page_items = titles[start:end] show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False) if page_items: if show_tmdb: with _busy_dialog(): tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items) for title in page_items: info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) _add_directory_item( handle, display_label, "seasons", {"plugin": plugin_name, "title": title}, is_folder=True, info_labels=info_labels, art=art, cast=cast, ) else: for title in page_items: playstate = _title_playstate(plugin_name, title) _add_directory_item( handle, _label_with_playstate(title, playstate), "seasons", {"plugin": plugin_name, "title": title}, is_folder=True, info_labels=_apply_playstate_to_info({"title": title}, playstate), ) if total_pages > 1 and page < total_pages: _add_directory_item( handle, "Nächste Seite", "popular", {"plugin": plugin_name, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) return sources = _plugins_with_popular() if not sources: xbmcgui.Dialog().notification("Beliebte Serien", "Keine Quellen gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, "Beliebte Serien") for name, plugin, _label in sources: _add_directory_item( handle, f"Beliebte Serien [{plugin.name}]", "popular", {"plugin": name, "page": "1"}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_new_titles(plugin_name: str, page: int = 1) -> None: handle = _get_handle() page_size = 10 page = max(1, int(page or 1)) plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if plugin is None or not _plugin_has_capability(plugin, "new_titles"): xbmcgui.Dialog().notification("Neue Titel", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return getter = getattr(plugin, "new_titles", None) if not callable(getter): xbmcgui.Dialog().notification("Neue Titel", "Nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return paging_getter = getattr(plugin, "new_titles_page", None) has_more_getter = getattr(plugin, "new_titles_has_more", None) if callable(paging_getter): xbmcplugin.setPluginCategory(handle, f"Neue Titel [{plugin_name}] ({page})") _set_content(handle, "movies" if plugin_name.casefold() == "einschalten" else "tvshows") if page > 1: _add_directory_item( handle, "Vorherige Seite", "new_titles", {"plugin": plugin_name, "page": str(page - 1)}, is_folder=True, ) try: page_items = list(paging_getter(page) or []) except Exception as exc: _log(f"Neue Titel konnten nicht geladen werden ({plugin_name} p{page}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Neue Titel", "Titel konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return page_items = [str(t).strip() for t in page_items if t and str(t).strip()] page_items.sort(key=lambda value: value.casefold()) else: try: titles = list(getter() or []) except Exception as exc: _log(f"Neue Titel konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Neue Titel", "Titel konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return titles = [str(t).strip() for t in titles if t and str(t).strip()] titles.sort(key=lambda value: value.casefold()) total = len(titles) if total == 0: xbmcgui.Dialog().notification( "Neue Titel", "Keine Titel gefunden (Basis-URL/Index prüfen).", xbmcgui.NOTIFICATION_INFO, 4000, ) total_pages = max(1, (total + page_size - 1) // page_size) page = min(page, total_pages) xbmcplugin.setPluginCategory(handle, f"Neue Titel [{plugin_name}] ({page}/{total_pages})") _set_content(handle, "movies" if plugin_name.casefold() == "einschalten" else "tvshows") if total_pages > 1 and page > 1: _add_directory_item( handle, "Vorherige Seite", "new_titles", {"plugin": plugin_name, "page": str(page - 1)}, is_folder=True, ) start = (page - 1) * page_size end = start + page_size page_items = titles[start:end] show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False) if page_items: if show_tmdb: with _busy_dialog(): tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items) for title in page_items: info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "movie") playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, display_label, "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title}, is_folder=not direct_play, info_labels=info_labels, art=art, cast=cast, ) else: for title in page_items: playstate = _title_playstate(plugin_name, title) direct_play = bool( plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) ) _add_directory_item( handle, _label_with_playstate(title, playstate), "play_movie" if direct_play else "seasons", {"plugin": plugin_name, "title": title}, is_folder=not direct_play, info_labels=_apply_playstate_to_info({"title": title}, playstate), ) show_next = False if callable(paging_getter) and callable(has_more_getter): try: show_next = bool(has_more_getter(page)) except Exception: show_next = False elif "total_pages" in locals(): show_next = bool(total_pages > 1 and page < total_pages) # type: ignore[name-defined] if show_next: _add_directory_item( handle, "Nächste Seite", "new_titles", {"plugin": plugin_name, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _show_latest_episodes(plugin_name: str, page: int = 1) -> None: handle = _get_handle() plugin_name = (plugin_name or "").strip() plugin = _discover_plugins().get(plugin_name) if not plugin: xbmcgui.Dialog().notification("Neueste Folgen", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return getter = getattr(plugin, "latest_episodes", None) if not callable(getter): xbmcgui.Dialog().notification("Neueste Folgen", "Nicht unterstützt.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return xbmcplugin.setPluginCategory(handle, f"{plugin_name}: Neueste Folgen") _set_content(handle, "episodes") try: with _busy_dialog(): entries = list(getter(page) or []) except Exception as exc: _log(f"Neueste Folgen fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Neueste Folgen", "Abruf fehlgeschlagen.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return for entry in entries: try: title = str(getattr(entry, "series_title", "") or "").strip() season_number = int(getattr(entry, "season", 0) or 0) episode_number = int(getattr(entry, "episode", 0) or 0) url = str(getattr(entry, "url", "") or "").strip() airdate = str(getattr(entry, "airdate", "") or "").strip() except Exception: continue if not title or not url or season_number < 0 or episode_number <= 0: continue season_label = f"Staffel {season_number}" episode_label = f"Episode {episode_number}" key = _playstate_key(plugin_name=plugin_name, title=title, season=season_label, episode=episode_label) playstate = _get_playstate(key) label = f"{title} - S{season_number:02d}E{episode_number:02d}" if airdate: label = f"{label} ({airdate})" label = _label_with_playstate(label, playstate) info_labels: dict[str, object] = { "title": f"{title} - S{season_number:02d}E{episode_number:02d}", "tvshowtitle": title, "season": season_number, "episode": episode_number, "mediatype": "episode", } info_labels = _apply_playstate_to_info(info_labels, playstate) _add_directory_item( handle, label, "play_episode_url", { "plugin": plugin_name, "title": title, "season": str(season_number), "episode": str(episode_number), "url": url, }, is_folder=False, info_labels=info_labels, ) xbmcplugin.endOfDirectory(handle) def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page: int = 1) -> None: handle = _get_handle() page_size = 10 page = max(1, int(page or 1)) try: titles = _get_genre_titles(plugin_name, genre) except Exception as exc: _log(f"Genre-Serien konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Genres", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcplugin.endOfDirectory(handle) return filtered = [title for title in titles if _group_matches(group_code, title)] total = len(filtered) total_pages = max(1, (total + page_size - 1) // page_size) page = min(page, total_pages) xbmcplugin.setPluginCategory(handle, f"{genre} [{group_code}] ({page}/{total_pages})") if total_pages > 1 and page > 1: _add_directory_item( handle, "Vorherige Seite", "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page - 1)}, is_folder=True, ) start = (page - 1) * page_size end = start + page_size page_items = filtered[start:end] show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False) if page_items: if show_tmdb: with _busy_dialog(): tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items) for title in page_items: info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "tvshow") if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": info_labels.setdefault("tvshowtitle", title) playstate = _title_playstate(plugin_name, title) info_labels = _apply_playstate_to_info(dict(info_labels), playstate) display_label = _label_with_duration(title, info_labels) display_label = _label_with_playstate(display_label, playstate) _add_directory_item( handle, display_label, "seasons", {"plugin": plugin_name, "title": title}, is_folder=True, info_labels=info_labels, art=art, cast=cast, ) else: for title in page_items: playstate = _title_playstate(plugin_name, title) _add_directory_item( handle, _label_with_playstate(title, playstate), "seasons", {"plugin": plugin_name, "title": title}, is_folder=True, info_labels=_apply_playstate_to_info({"title": title}, playstate), ) if total_pages > 1 and page < total_pages: _add_directory_item( handle, "Nächste Seite", "genre_series_group", {"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page + 1)}, is_folder=True, ) xbmcplugin.endOfDirectory(handle) def _open_settings() -> None: """Oeffnet das Kodi-Addon-Settings-Dialog.""" if xbmcaddon is None: # pragma: no cover - outside Kodi raise RuntimeError("xbmcaddon ist nicht verfuegbar (KodiStub).") addon = xbmcaddon.Addon() addon.openSettings() def _extract_first_int(value: str) -> int | None: match = re.search(r"(\d+)", value or "") if not match: return None try: return int(match.group(1)) except Exception: return None def _duration_label(duration_seconds: int) -> str: try: duration_seconds = int(duration_seconds or 0) except Exception: duration_seconds = 0 if duration_seconds <= 0: return "" total_minutes = max(0, duration_seconds // 60) hours = max(0, total_minutes // 60) minutes = max(0, total_minutes % 60) return f"{hours:02d}:{minutes:02d} Laufzeit" def _label_with_duration(label: str, info_labels: dict[str, str] | None) -> str: return label def _play_final_link( link: str, *, display_title: str | None = None, info_labels: dict[str, str] | None = None, art: dict[str, str] | None = None, cast: list[TmdbCastMember] | None = None, resolve_handle: int | None = None, ) -> None: list_item = xbmcgui.ListItem(label=display_title or "", path=link) try: list_item.setProperty("IsPlayable", "true") except Exception: pass merged_info: dict[str, object] = dict(info_labels or {}) if display_title: merged_info["title"] = display_title _apply_video_info(list_item, merged_info, cast) if art: setter = getattr(list_item, "setArt", None) if callable(setter): try: setter(art) except Exception: pass # Bei Plugin-Play-Items sollte Kodi via setResolvedUrl() die Wiedergabe starten. # player.play() kann dazu führen, dass Kodi den Item-Callback nochmal triggert (Hoster-Auswahl doppelt). resolved = False if resolve_handle is not None: resolver = getattr(xbmcplugin, "setResolvedUrl", None) if callable(resolver): try: resolver(resolve_handle, True, list_item) resolved = True except Exception: pass if not resolved: player = xbmc.Player() player.play(item=link, listitem=list_item) def _track_playback_and_update_state(key: str) -> None: if not key: return monitor = xbmc.Monitor() if xbmc is not None and hasattr(xbmc, "Monitor") else None player = xbmc.Player() # Wait for playback start. started = False for _ in range(30): try: if player.isPlayingVideo(): started = True break except Exception: pass if monitor and monitor.waitForAbort(0.5): return if not started: return last_pos = 0.0 total = 0.0 while True: try: if not player.isPlayingVideo(): break last_pos = float(player.getTime() or 0.0) total = float(player.getTotalTime() or 0.0) except Exception: pass if monitor and monitor.waitForAbort(1.0): return if total <= 0.0: return percent = max(0.0, min(1.0, last_pos / total)) state: dict[str, object] = {"last_position": int(last_pos), "resume_total": int(total), "percent": percent} if percent >= WATCHED_THRESHOLD: state["watched"] = True state["resume_position"] = 0 elif last_pos > 0: state["watched"] = False state["resume_position"] = int(last_pos) _set_playstate(key, state) # Zusätzlich aggregiert speichern, damit Titel-/Staffel-Listen "gesehen/fortsetzen" # anzeigen können (für Filme/Serien gleichermaßen). try: parts = str(key).split("\t") if len(parts) == 4: plugin_name, title, season, _episode = parts plugin_name = (plugin_name or "").strip() title = (title or "").strip() season = (season or "").strip() if plugin_name and title: _set_playstate(_playstate_key(plugin_name=plugin_name, title=title, season="", episode=""), state) if season: _set_playstate(_playstate_key(plugin_name=plugin_name, title=title, season=season, episode=""), state) except Exception: pass def _play_episode( plugin_name: str, title: str, season: str, episode: str, *, resolve_handle: int | None = None, ) -> None: _log(f"Play anfordern: {plugin_name} / {title} / {season} / {episode}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Play", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return available_hosters: list[str] = [] hoster_getter = getattr(plugin, "available_hosters_for", None) if callable(hoster_getter): try: with _busy_dialog(): available_hosters = list(hoster_getter(title, season, episode) or []) except Exception as exc: _log(f"Hoster laden fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) selected_hoster: str | None = None if available_hosters: if len(available_hosters) == 1: selected_hoster = available_hosters[0] else: selected_index = xbmcgui.Dialog().select("Hoster wählen", available_hosters) if selected_index is None or selected_index < 0: _log("Play abgebrochen (kein Hoster gewählt).", xbmc.LOGDEBUG) return selected_hoster = available_hosters[selected_index] # Manche Plugins erlauben (optional) eine temporaere Einschränkung auf einen Hoster. preferred_setter = getattr(plugin, "set_preferred_hosters", None) restore_hosters: list[str] | None = None if selected_hoster and callable(preferred_setter): current = getattr(plugin, "_preferred_hosters", None) if isinstance(current, list): restore_hosters = list(current) preferred_setter([selected_hoster]) try: link = plugin.stream_link_for(title, season, episode) if not link: _log("Kein Stream-Link gefunden.", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Play", "Kein Stream-Link gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return _log(f"Stream-Link: {link}", xbmc.LOGDEBUG) final_link = plugin.resolve_stream_link(link) or link finally: if restore_hosters is not None and callable(preferred_setter): preferred_setter(restore_hosters) _log(f"Finaler Link: {final_link}", xbmc.LOGDEBUG) season_number = _extract_first_int(season) episode_number = _extract_first_int(episode) if season_number is not None and episode_number is not None: display_title = f"{title} - S{season_number:02d}E{episode_number:02d}" else: display_title = title info_labels, art, cast = _tmdb_labels_and_art(title) display_title = _label_with_duration(display_title, info_labels) _play_final_link( final_link, display_title=display_title, info_labels=info_labels, art=art, cast=cast, resolve_handle=resolve_handle, ) _track_playback_and_update_state( _playstate_key(plugin_name=plugin_name, title=title, season=season, episode=episode) ) def _play_episode_url( plugin_name: str, *, title: str, season_number: int, episode_number: int, episode_url: str, resolve_handle: int | None = None, ) -> None: season_label = f"Staffel {season_number}" if season_number > 0 else "" episode_label = f"Episode {episode_number}" if episode_number > 0 else "" _log(f"Play (URL) anfordern: {plugin_name} / {title} / {season_label} / {episode_label} / {episode_url}") plugin = _discover_plugins().get(plugin_name) if plugin is None: xbmcgui.Dialog().notification("Play", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return available_hosters: list[str] = [] hoster_getter = getattr(plugin, "available_hosters_for_url", None) if callable(hoster_getter): try: with _busy_dialog(): available_hosters = list(hoster_getter(episode_url) or []) except Exception as exc: _log(f"Hoster laden fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING) selected_hoster: str | None = None if available_hosters: if len(available_hosters) == 1: selected_hoster = available_hosters[0] else: selected_index = xbmcgui.Dialog().select("Hoster wählen", available_hosters) if selected_index is None or selected_index < 0: _log("Play abgebrochen (kein Hoster gewählt).", xbmc.LOGDEBUG) return selected_hoster = available_hosters[selected_index] preferred_setter = getattr(plugin, "set_preferred_hosters", None) restore_hosters: list[str] | None = None if selected_hoster and callable(preferred_setter): current = getattr(plugin, "_preferred_hosters", None) if isinstance(current, list): restore_hosters = list(current) preferred_setter([selected_hoster]) try: link_getter = getattr(plugin, "stream_link_for_url", None) if not callable(link_getter): xbmcgui.Dialog().notification("Play", "Nicht unterstützt.", xbmcgui.NOTIFICATION_INFO, 3000) return link = link_getter(episode_url) if not link: _log("Kein Stream-Link gefunden.", xbmc.LOGWARNING) xbmcgui.Dialog().notification("Play", "Kein Stream-Link gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) return _log(f"Stream-Link: {link}", xbmc.LOGDEBUG) final_link = plugin.resolve_stream_link(link) or link finally: if restore_hosters is not None and callable(preferred_setter): preferred_setter(restore_hosters) display_title = f"{title} - S{season_number:02d}E{episode_number:02d}" if season_number and episode_number else title info_labels, art, cast = _tmdb_labels_and_art(title) info_labels = dict(info_labels or {}) info_labels.setdefault("mediatype", "episode") info_labels.setdefault("tvshowtitle", title) if season_number > 0: info_labels["season"] = str(season_number) if episode_number > 0: info_labels["episode"] = str(episode_number) display_title = _label_with_duration(display_title, info_labels) _play_final_link( final_link, display_title=display_title, info_labels=info_labels, art=art, cast=cast, resolve_handle=resolve_handle, ) _track_playback_and_update_state( _playstate_key(plugin_name=plugin_name, title=title, season=season_label, episode=episode_label) ) def _parse_params() -> dict[str, str]: """Parst Kodi-Plugin-Parameter aus `sys.argv[2]`.""" if len(sys.argv) <= 2 or not sys.argv[2]: return {} raw_params = parse_qs(sys.argv[2].lstrip("?"), keep_blank_values=True) return {key: values[0] for key, values in raw_params.items()} def run() -> None: params = _parse_params() action = params.get("action") _log(f"Action: {action}", xbmc.LOGDEBUG) if action == "search": _show_search() elif action == "plugin_menu": _show_plugin_menu(params.get("plugin", "")) elif action == "plugin_search": _show_plugin_search(params.get("plugin", "")) elif action == "genre_sources": _show_genre_sources() elif action == "genres": _show_genres(params.get("plugin", "")) elif action == "new_titles": _show_new_titles( params.get("plugin", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "latest_episodes": _show_latest_episodes( params.get("plugin", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "genre_series": _show_genre_series( params.get("plugin", ""), params.get("genre", ""), ) elif action == "genre_titles_page": _show_genre_titles_page( params.get("plugin", ""), params.get("genre", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "genre_series_group": _show_genre_series_group( params.get("plugin", ""), params.get("genre", ""), params.get("group", ""), _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "popular": _show_popular( params.get("plugin") or None, _parse_positive_int(params.get("page", "1"), default=1), ) elif action == "settings": _open_settings() elif action == "seasons": _show_seasons(params.get("plugin", ""), params.get("title", "")) elif action == "episodes": _show_episodes( params.get("plugin", ""), params.get("title", ""), params.get("season", ""), ) elif action == "play_episode": _play_episode( params.get("plugin", ""), params.get("title", ""), params.get("season", ""), params.get("episode", ""), resolve_handle=_get_handle(), ) elif action == "play_movie": plugin_name = params.get("plugin", "") title = params.get("title", "") # Einschalten liefert Filme (keine Staffeln/Episoden). Für Playback nutzen wir: # -> Stream -> . if (plugin_name or "").casefold() == "einschalten": _play_episode( plugin_name, title, "Stream", title, resolve_handle=_get_handle(), ) else: _play_episode( plugin_name, title, "Film", "Stream", resolve_handle=_get_handle(), ) elif action == "play_episode_url": _play_episode_url( params.get("plugin", ""), title=params.get("title", ""), season_number=_parse_positive_int(params.get("season", "0"), default=0), episode_number=_parse_positive_int(params.get("episode", "0"), default=0), episode_url=params.get("url", ""), resolve_handle=_get_handle(), ) elif action == "play": link = params.get("url", "") if link: _play_final_link(link, resolve_handle=_get_handle()) else: _show_root_menu() if __name__ == "__main__": run()