"""Einschalten Plugin. Optionales Debugging wie bei Serienstream: - URL-Logging - HTML-Dumps - On-Screen URL-Info """ from __future__ import annotations import json import re from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Set from urllib.parse import urlencode, urljoin, urlsplit try: # pragma: no cover - optional dependency (Kodi dependency) import requests except ImportError as exc: # pragma: no cover requests = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None try: # pragma: no cover - optional Kodi helpers import xbmcaddon # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow running outside Kodi xbmcaddon = None from plugin_interface import BasisPlugin from plugin_helpers import dump_response_html, get_setting_bool, log_error, log_url, notify_url ADDON_ID = "plugin.video.viewit" SETTING_BASE_URL = "einschalten_base_url" GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info" GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors" SETTING_LOG_URLS = "log_urls_einschalten" SETTING_DUMP_HTML = "dump_html_einschalten" SETTING_SHOW_URL_INFO = "show_url_info_einschalten" SETTING_LOG_ERRORS = "log_errors_einschalten" DEFAULT_BASE_URL = "https://einschalten.in" DEFAULT_INDEX_PATH = "/" DEFAULT_NEW_TITLES_PATH = "/movies/new" DEFAULT_SEARCH_PATH = "/search" DEFAULT_GENRES_PATH = "/genres" DEFAULT_WATCH_PATH_TEMPLATE = "/api/movies/{id}/watch" HEADERS = { "User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)", "Accept": "text/html,application/xhtml+xml,application/json;q=0.9,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Connection": "keep-alive", } ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] def _emit_progress(callback: ProgressCallback, message: str, percent: Optional[int] = None) -> None: if not callable(callback): return try: callback(str(message or ""), None if percent is None else int(percent)) except Exception: return @dataclass(frozen=True) class MovieItem: id: int title: str release_date: str = "" poster_path: str = "" vote_average: float | None = None collection_id: int | None = None @dataclass(frozen=True) class MovieDetail: id: int title: str tagline: str = "" overview: str = "" release_date: str = "" runtime_minutes: int | None = None poster_path: str = "" backdrop_path: str = "" vote_average: float | None = None vote_count: int | None = None homepage: str = "" imdb_id: str = "" wikidata_id: str = "" genres: List[str] | None = None def _normalize_search_text(value: str) -> str: value = (value or "").casefold() value = re.sub(r"[^a-z0-9]+", " ", value) value = re.sub(r"\s+", " ", value).strip() return value def _matches_query(query: str, *, title: str) -> bool: normalized_query = _normalize_search_text(query) if not normalized_query: return False haystack = f" {_normalize_search_text(title)} " return f" {normalized_query} " in haystack def _filter_movies_by_title(query: str, movies: List[MovieItem]) -> List[MovieItem]: query = (query or "").strip() if not query: return [] return [movie for movie in movies if _matches_query(query, title=movie.title)] def _get_setting_text(setting_id: str, *, default: str = "") -> str: if xbmcaddon is None: return default try: addon = xbmcaddon.Addon(ADDON_ID) getter = getattr(addon, "getSettingString", None) if getter is not None: return str(getter(setting_id) or "").strip() return str(addon.getSetting(setting_id) or "").strip() except Exception: return default def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool: return get_setting_bool(ADDON_ID, setting_id, default=default) def _ensure_requests() -> None: if requests is None: raise RuntimeError(f"requests ist nicht verfuegbar: {REQUESTS_IMPORT_ERROR}") def _extract_ng_state_payload(html: str) -> Dict[str, Any]: """Extrahiert JSON aus ``.""" html = html or "" # Regex ist hier ausreichend und vermeidet bs4-Abhängigkeit. match = re.search( r']*id=["\\\']ng-state["\\\'][^>]*>(.*?)', html, flags=re.IGNORECASE | re.DOTALL, ) if not match: return {} raw = (match.group(1) or "").strip() if not raw: return {} try: data = json.loads(raw) except Exception: return {} return data if isinstance(data, dict) else {} def _notify_url(url: str) -> None: notify_url( ADDON_ID, heading="Einschalten", url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO, plugin_setting_id=SETTING_SHOW_URL_INFO, ) def _log_url(url: str, *, kind: str = "VISIT") -> None: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="einschalten_urls.log", url=url, kind=kind, ) def _log_debug_line(message: str) -> None: try: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="einschalten_debug.log", url=message, kind="DEBUG", ) except Exception: pass def _log_titles(items: list[MovieItem], *, context: str) -> None: if not items: return try: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="einschalten_titles.log", url=f"{context}:count={len(items)}", kind="TITLE", ) for item in items: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="einschalten_titles.log", url=f"{context}:id={item.id} title={item.title}", kind="TITLE", ) except Exception: pass def _log_response_html(url: str, body: str) -> None: dump_response_html( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_DUMP_HTML, plugin_setting_id=SETTING_DUMP_HTML, url=url, body=body, filename_prefix="einschalten_response", ) def _log_error(message: str) -> None: log_error( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS, plugin_setting_id=SETTING_LOG_ERRORS, log_filename="einschalten_errors.log", message=message, ) def _u_matches(value: Any, expected_path: str) -> bool: raw = (value or "").strip() if not raw: return False if raw == expected_path: return True try: if "://" in raw: path = urlsplit(raw).path or "" else: path = raw.split("?", 1)[0].split("#", 1)[0] if path == expected_path: return True except Exception: pass return raw.endswith(expected_path) def _parse_ng_state_movies(payload: Dict[str, Any]) -> List[MovieItem]: movies: List[MovieItem] = [] for value in (payload or {}).values(): if not isinstance(value, dict): continue # In ng-state payload, `u` (URL) is a sibling of `b` (body), not nested inside `b`. if not _u_matches(value.get("u"), "/api/movies"): continue block = value.get("b") if not isinstance(block, dict): continue data = block.get("data") if not isinstance(data, list): continue for item in data: if not isinstance(item, dict): continue try: movie_id = int(item.get("id")) except Exception: continue title = str(item.get("title") or "").strip() if not title: continue vote_average = item.get("voteAverage") try: vote_average_f = float(vote_average) if vote_average is not None else None except Exception: vote_average_f = None collection_id = item.get("collectionId") try: collection_id_i = int(collection_id) if collection_id is not None else None except Exception: collection_id_i = None movies.append( MovieItem( id=movie_id, title=title, release_date=str(item.get("releaseDate") or ""), poster_path=str(item.get("posterPath") or ""), vote_average=vote_average_f, collection_id=collection_id_i, ) ) return movies def _parse_ng_state_movies_with_pagination(payload: Dict[str, Any]) -> tuple[List[MovieItem], bool | None, int | None]: """Parses ng-state for `u: "/api/movies"` where `b` contains `{data:[...], pagination:{...}}`. Returns: (movies, has_more, current_page) """ movies: List[MovieItem] = [] has_more: bool | None = None current_page: int | None = None for value in (payload or {}).values(): if not isinstance(value, dict): continue if not _u_matches(value.get("u"), "/api/movies"): continue block = value.get("b") if not isinstance(block, dict): continue pagination = block.get("pagination") if isinstance(pagination, dict): if "hasMore" in pagination: has_more = bool(pagination.get("hasMore") is True) try: current_page = int(pagination.get("currentPage")) if pagination.get("currentPage") is not None else None except Exception: current_page = None data = block.get("data") if not isinstance(data, list): continue for item in data: if not isinstance(item, dict): continue try: movie_id = int(item.get("id")) except Exception: continue title = str(item.get("title") or "").strip() if not title: continue vote_average = item.get("voteAverage") try: vote_average_f = float(vote_average) if vote_average is not None else None except Exception: vote_average_f = None collection_id = item.get("collectionId") try: collection_id_i = int(collection_id) if collection_id is not None else None except Exception: collection_id_i = None movies.append( MovieItem( id=movie_id, title=title, release_date=str(item.get("releaseDate") or ""), poster_path=str(item.get("posterPath") or ""), vote_average=vote_average_f, collection_id=collection_id_i, ) ) # Stop after first matching block (genre pages should only have one). break return movies, has_more, current_page def _parse_ng_state_search_results(payload: Dict[str, Any]) -> List[MovieItem]: movies: List[MovieItem] = [] for value in (payload or {}).values(): if not isinstance(value, dict): continue if not _u_matches(value.get("u"), "/api/search"): continue block = value.get("b") if not isinstance(block, dict): continue data = block.get("data") if not isinstance(data, list): continue for item in data: if not isinstance(item, dict): continue try: movie_id = int(item.get("id")) except Exception: continue title = str(item.get("title") or "").strip() if not title: continue vote_average = item.get("voteAverage") try: vote_average_f = float(vote_average) if vote_average is not None else None except Exception: vote_average_f = None collection_id = item.get("collectionId") try: collection_id_i = int(collection_id) if collection_id is not None else None except Exception: collection_id_i = None movies.append( MovieItem( id=movie_id, title=title, release_date=str(item.get("releaseDate") or ""), poster_path=str(item.get("posterPath") or ""), vote_average=vote_average_f, collection_id=collection_id_i, ) ) return movies def _parse_ng_state_movie_detail(payload: Dict[str, Any], *, movie_id: int) -> MovieDetail | None: movie_id = int(movie_id or 0) if movie_id <= 0: return None expected_u = f"/api/movies/{movie_id}" for value in (payload or {}).values(): if not isinstance(value, dict): continue if not _u_matches(value.get("u"), expected_u): continue block = value.get("b") if not isinstance(block, dict): continue try: parsed_id = int(block.get("id")) except Exception: continue if parsed_id != movie_id: continue title = str(block.get("title") or "").strip() if not title: continue runtime = block.get("runtime") try: runtime_i = int(runtime) if runtime is not None else None except Exception: runtime_i = None vote_average = block.get("voteAverage") try: vote_average_f = float(vote_average) if vote_average is not None else None except Exception: vote_average_f = None vote_count = block.get("voteCount") try: vote_count_i = int(vote_count) if vote_count is not None else None except Exception: vote_count_i = None genres_raw = block.get("genres") genres: List[str] | None = None if isinstance(genres_raw, list): names: List[str] = [] for g in genres_raw: if isinstance(g, dict): name = str(g.get("name") or "").strip() if name: names.append(name) genres = names return MovieDetail( id=movie_id, title=title, tagline=str(block.get("tagline") or "").strip(), overview=str(block.get("overview") or "").strip(), release_date=str(block.get("releaseDate") or "").strip(), runtime_minutes=runtime_i, poster_path=str(block.get("posterPath") or "").strip(), backdrop_path=str(block.get("backdropPath") or "").strip(), vote_average=vote_average_f, vote_count=vote_count_i, homepage=str(block.get("homepage") or "").strip(), imdb_id=str(block.get("imdbId") or "").strip(), wikidata_id=str(block.get("wikidataId") or "").strip(), genres=genres, ) return None def _parse_ng_state_genres(payload: Dict[str, Any]) -> Dict[str, int]: """Parses ng-state for `u: "/api/genres"` where `b` is a list of {id,name}.""" genres: Dict[str, int] = {} for value in (payload or {}).values(): if not isinstance(value, dict): continue if not _u_matches(value.get("u"), "/api/genres"): continue block = value.get("b") if not isinstance(block, list): continue for item in block: if not isinstance(item, dict): continue name = str(item.get("name") or "").strip() if not name: continue try: gid = int(item.get("id")) except Exception: continue if gid > 0: genres[name] = gid return genres class EinschaltenPlugin(BasisPlugin): """Metadata-Plugin für eine autorisierte Quelle.""" name = "Einschalten" version = "1.0.0" def __init__(self) -> None: self.is_available = REQUESTS_AVAILABLE self.unavailable_reason = None if REQUESTS_AVAILABLE else f"requests fehlt: {REQUESTS_IMPORT_ERROR}" self._session = None self._id_by_title: Dict[str, int] = {} self._detail_html_by_id: Dict[int, str] = {} self._detail_by_id: Dict[int, MovieDetail] = {} self._genre_id_by_name: Dict[str, int] = {} self._genre_has_more_by_id_page: Dict[tuple[int, int], bool] = {} self._new_titles_has_more_by_page: Dict[int, bool] = {} def _get_session(self): _ensure_requests() if self._session is None: self._session = requests.Session() return self._session def _http_get_text(self, url: str, *, timeout: int = 20) -> tuple[str, str]: _log_url(url, kind="GET") _notify_url(url) sess = self._get_session() response = None try: response = sess.get(url, headers=HEADERS, timeout=timeout) response.raise_for_status() final_url = (response.url or url) if response is not None else url body = (response.text or "") if response is not None else "" _log_url(final_url, kind="OK") _log_response_html(final_url, body) return final_url, body finally: if response is not None: try: response.close() except Exception: pass def _http_get_json(self, url: str, *, timeout: int = 20) -> tuple[str, Any]: final_url, body = self._http_get_text(url, timeout=timeout) try: payload = json.loads(body or "{}") except Exception: payload = {} return final_url, payload def _get_base_url(self) -> str: base = _get_setting_text(SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip() return base.rstrip("/") def _index_url(self) -> str: base = self._get_base_url() if not base: return "" path = DEFAULT_INDEX_PATH return urljoin(base + "/", path.lstrip("/")) def _new_titles_url(self) -> str: base = self._get_base_url() if not base: return "" path = DEFAULT_NEW_TITLES_PATH return urljoin(base + "/", path.lstrip("/")) def _genres_url(self) -> str: base = self._get_base_url() if not base: return "" path = DEFAULT_GENRES_PATH return urljoin(base + "/", path.lstrip("/")) def _api_genres_url(self) -> str: base = self._get_base_url() if not base: return "" return urljoin(base + "/", "api/genres") def _search_url(self, query: str) -> str: base = self._get_base_url() if not base: return "" path = DEFAULT_SEARCH_PATH url = urljoin(base + "/", path.lstrip("/")) return f"{url}?{urlencode({'query': query})}" def _api_movies_url(self, *, with_genres: int, page: int = 1) -> str: base = self._get_base_url() if not base: return "" params: Dict[str, str] = {"withGenres": str(int(with_genres))} if page and int(page) > 1: params["page"] = str(int(page)) return urljoin(base + "/", "api/movies") + f"?{urlencode(params)}" def _genre_page_url(self, *, genre_id: int, page: int = 1) -> str: """Genre title pages are rendered server-side and embed the movie list in ng-state. Example: - `/genres/` contains ng-state with `u: "/api/movies"` and `b.data` + `b.pagination`. """ base = self._get_base_url() if not base: return "" genre_root = self._genres_url().rstrip("/") if not genre_root: return "" page = max(1, int(page or 1)) url = urljoin(genre_root + "/", str(int(genre_id))) if page > 1: url = f"{url}?{urlencode({'page': str(page)})}" return url def _movie_detail_url(self, movie_id: int) -> str: base = self._get_base_url() if not base: return "" return urljoin(base + "/", f"movies/{int(movie_id)}") def _watch_url(self, movie_id: int) -> str: base = self._get_base_url() if not base: return "" template = DEFAULT_WATCH_PATH_TEMPLATE try: path = template.format(id=int(movie_id)) except Exception: path = DEFAULT_WATCH_PATH_TEMPLATE.format(id=int(movie_id)) return urljoin(base + "/", path.lstrip("/")) def _ensure_title_id(self, title: str) -> int | None: title = (title or "").strip() if not title: return None cached = self._id_by_title.get(title) if isinstance(cached, int) and cached > 0: return cached # Fallback: scan index ng-state again to rebuild mapping. for movie in self._load_movies(): if movie.title == title: self._id_by_title[title] = movie.id return movie.id # Kodi startet das Plugin pro Navigation neu -> RAM-Cache geht verloren. # Für Titel, die nicht auf der Index-Seite sind (z.B. /movies/new), lösen wir die ID # über die Suchseite auf, die ebenfalls `id` + `title` im ng-state liefert. try: normalized = title.casefold().strip() for movie in self._fetch_search_movies(title): if (movie.title or "").casefold().strip() == normalized: self._id_by_title[title] = movie.id return movie.id except Exception: pass return None def _fetch_movie_detail(self, movie_id: int) -> str: movie_id = int(movie_id or 0) if movie_id <= 0: return "" cached = self._detail_html_by_id.get(movie_id) if isinstance(cached, str) and cached: return cached url = self._movie_detail_url(movie_id) if not url: return "" try: _, body = self._http_get_text(url, timeout=20) self._detail_html_by_id[movie_id] = body return body except Exception as exc: _log_error(f"GET {url} failed: {exc}") return "" def _fetch_watch_payload(self, movie_id: int) -> dict[str, object]: movie_id = int(movie_id or 0) if movie_id <= 0: return {} url = self._watch_url(movie_id) if not url: return {} try: _, data = self._http_get_json(url, timeout=20) return data except Exception as exc: _log_error(f"GET {url} failed: {exc}") return {} def _watch_stream_url(self, movie_id: int) -> str: payload = self._fetch_watch_payload(movie_id) stream_url = payload.get("streamUrl") return str(stream_url).strip() if isinstance(stream_url, str) and stream_url.strip() else "" def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]: """Optional hook for the UI layer (default.py) to attach metadata/art without TMDB.""" title = (title or "").strip() movie_id = self._ensure_title_id(title) if movie_id is None: return {}, {}, None detail = self._detail_by_id.get(movie_id) if detail is None: html = self._fetch_movie_detail(movie_id) payload = _extract_ng_state_payload(html) parsed = _parse_ng_state_movie_detail(payload, movie_id=movie_id) if parsed is not None: self._detail_by_id[movie_id] = parsed detail = parsed info: dict[str, str] = {"mediatype": "movie", "title": title} art: dict[str, str] = {} if detail is None: return info, art, None if detail.overview: info["plot"] = detail.overview if detail.tagline: info["tagline"] = detail.tagline if detail.release_date: info["premiered"] = detail.release_date if len(detail.release_date) >= 4 and detail.release_date[:4].isdigit(): info["year"] = detail.release_date[:4] if detail.runtime_minutes is not None and detail.runtime_minutes > 0: info["duration"] = str(int(detail.runtime_minutes) * 60) if detail.vote_average is not None: info["rating"] = str(detail.vote_average) if detail.vote_count is not None: info["votes"] = str(detail.vote_count) if detail.genres: info["genre"] = " / ".join(detail.genres) base = self._get_base_url() if base: if detail.poster_path: poster = urljoin(base + "/", f"api/image/poster/{detail.poster_path.lstrip('/')}") art.update({"thumb": poster, "poster": poster}) if detail.backdrop_path: backdrop = urljoin(base + "/", f"api/image/backdrop/{detail.backdrop_path.lstrip('/')}") art.setdefault("fanart", backdrop) art.setdefault("landscape", backdrop) return info, art, None def _fetch_index_movies(self) -> List[MovieItem]: url = self._index_url() if not url: return [] try: _, body = self._http_get_text(url, timeout=20) payload = _extract_ng_state_payload(body) return _parse_ng_state_movies(payload) except Exception: return [] def _fetch_new_titles_movies(self) -> List[MovieItem]: # "Neue Filme" lives at `/movies/new` and embeds the list in ng-state (`u: "/api/movies"`). url = self._new_titles_url() if not url: return [] try: _, body = self._http_get_text(url, timeout=20) payload = _extract_ng_state_payload(body) movies = _parse_ng_state_movies(payload) _log_debug_line(f"parse_ng_state_movies:count={len(movies)}") if movies: _log_titles(movies, context="new_titles") return movies return [] except Exception: return [] def _fetch_new_titles_movies_page(self, page: int) -> List[MovieItem]: page = max(1, int(page or 1)) url = self._new_titles_url() if not url: return [] if page > 1: url = f"{url}?{urlencode({'page': str(page)})}" try: _, body = self._http_get_text(url, timeout=20) payload = _extract_ng_state_payload(body) movies, has_more, current_page = _parse_ng_state_movies_with_pagination(payload) _log_debug_line(f"parse_ng_state_movies_page:page={page} count={len(movies)}") if has_more is not None: self._new_titles_has_more_by_page[page] = bool(has_more) elif current_page is not None and int(current_page) != page: self._new_titles_has_more_by_page[page] = False if movies: _log_titles(movies, context=f"new_titles_page={page}") return movies self._new_titles_has_more_by_page[page] = False return [] except Exception: return [] def new_titles_page(self, page: int) -> List[str]: """Paged variant: returns titles for `/movies/new?page=`.""" if not REQUESTS_AVAILABLE: return [] if not self._get_base_url(): return [] page = max(1, int(page or 1)) movies = self._fetch_new_titles_movies_page(page) titles: List[str] = [] seen: set[str] = set() for movie in movies: if movie.title in seen: continue seen.add(movie.title) self._id_by_title[movie.title] = movie.id titles.append(movie.title) return titles def new_titles_has_more(self, page: int) -> bool: """Tells the UI whether `/movies/new` has a next page after `page`.""" page = max(1, int(page or 1)) cached = self._new_titles_has_more_by_page.get(page) if cached is not None: return bool(cached) # Load page to fill cache. _ = self._fetch_new_titles_movies_page(page) return bool(self._new_titles_has_more_by_page.get(page, False)) def _fetch_search_movies(self, query: str) -> List[MovieItem]: query = (query or "").strip() if not query: return [] # Parse ng-state from /search page HTML. url = self._search_url(query) if not url: return [] try: _, body = self._http_get_text(url, timeout=20) payload = _extract_ng_state_payload(body) results = _parse_ng_state_search_results(payload) return _filter_movies_by_title(query, results) except Exception: return [] def _load_movies(self) -> List[MovieItem]: return self._fetch_index_movies() def _ensure_genre_index(self) -> None: if self._genre_id_by_name: return # Prefer direct JSON API (simpler): GET /api/genres -> [{"id":..,"name":..}, ...] api_url = self._api_genres_url() if api_url: try: _, payload = self._http_get_json(api_url, timeout=20) if isinstance(payload, list): parsed: Dict[str, int] = {} for item in payload: if not isinstance(item, dict): continue name = str(item.get("name") or "").strip() if not name: continue try: gid = int(item.get("id")) except Exception: continue if gid > 0: parsed[name] = gid if parsed: self._genre_id_by_name.clear() self._genre_id_by_name.update(parsed) return except Exception: pass # Fallback: parse ng-state from HTML /genres page. url = self._genres_url() if not url: return try: _, body = self._http_get_text(url, timeout=20) payload = _extract_ng_state_payload(body) parsed = _parse_ng_state_genres(payload) if parsed: self._genre_id_by_name.clear() self._genre_id_by_name.update(parsed) except Exception: return async def search_titles(self, query: str, progress_callback: ProgressCallback = None) -> List[str]: if not REQUESTS_AVAILABLE: return [] query = (query or "").strip() if not query: return [] if not self._get_base_url(): return [] _emit_progress(progress_callback, "Einschalten Suche", 15) movies = self._fetch_search_movies(query) if not movies: _emit_progress(progress_callback, "Fallback: Index filtern", 45) movies = _filter_movies_by_title(query, self._load_movies()) _emit_progress(progress_callback, f"Treffer verarbeiten ({len(movies)})", 75) titles: List[str] = [] seen: set[str] = set() for movie in movies: if movie.title in seen: continue seen.add(movie.title) self._id_by_title[movie.title] = movie.id titles.append(movie.title) titles.sort(key=lambda value: value.casefold()) _emit_progress(progress_callback, f"Fertig: {len(titles)} Treffer", 95) return titles def genres(self) -> List[str]: if not REQUESTS_AVAILABLE: return [] if not self._get_base_url(): return [] self._ensure_genre_index() return sorted(self._genre_id_by_name.keys(), key=lambda value: value.casefold()) def titles_for_genre(self, genre: str) -> List[str]: # Backwards compatible (first page only); paging handled via titles_for_genre_page(). titles = self.titles_for_genre_page(genre, 1) titles.sort(key=lambda value: value.casefold()) return titles def titles_for_genre_page(self, genre: str, page: int) -> List[str]: if not REQUESTS_AVAILABLE: return [] genre = (genre or "").strip() if not genre: return [] if not self._get_base_url(): return [] self._ensure_genre_index() genre_id = self._genre_id_by_name.get(genre) if not genre_id: return [] # Do NOT use `/api/movies?withGenres=...` directly: on some deployments it returns # a mismatched/unfiltered dataset. Instead parse the server-rendered genre page # `/genres/` which embeds the correct data in ng-state. url = self._genre_page_url(genre_id=int(genre_id), page=max(1, int(page or 1))) if not url: return [] try: _, body = self._http_get_text(url, timeout=20) payload = _extract_ng_state_payload(body) except Exception: return [] if not isinstance(payload, dict): return [] movies, has_more, current_page = _parse_ng_state_movies_with_pagination(payload) page = max(1, int(page or 1)) if has_more is not None: self._genre_has_more_by_id_page[(int(genre_id), page)] = bool(has_more) elif current_page is not None and int(current_page) != page: # Defensive: if the page param wasn't honored, avoid showing "next". self._genre_has_more_by_id_page[(int(genre_id), page)] = False titles: List[str] = [] seen: set[str] = set() for movie in movies: title = (movie.title or "").strip() if not title or title in seen: continue seen.add(title) if movie.id > 0: self._id_by_title[title] = int(movie.id) titles.append(title) return titles def genre_has_more(self, genre: str, page: int) -> bool: """Optional: tells the UI whether a genre has more pages after `page`.""" genre = (genre or "").strip() if not genre: return False self._ensure_genre_index() genre_id = self._genre_id_by_name.get(genre) if not genre_id: return False page = max(1, int(page or 1)) cached = self._genre_has_more_by_id_page.get((int(genre_id), page)) if cached is not None: return bool(cached) # If the page wasn't loaded yet, load it (fills the cache) and then report. _ = self.titles_for_genre_page(genre, page) return bool(self._genre_has_more_by_id_page.get((int(genre_id), page), False)) def seasons_for(self, title: str) -> List[str]: # Beim Öffnen eines Titels: Detailseite anhand der ID abrufen (HTML) und cachen. title = (title or "").strip() if not title: return [] movie_id = self._ensure_title_id(title) if movie_id is not None: self._fetch_movie_detail(movie_id) # Playback: expose a single "Stream" folder (inside: 1 playable item = Filmtitel). return ["Stream"] def episodes_for(self, title: str, season: str) -> List[str]: season = (season or "").strip() if season.casefold() == "stream": title = (title or "").strip() return [title] if title else [] return [] def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: title = (title or "").strip() season = (season or "").strip() episode = (episode or "").strip() # Backwards compatible: # - old: Film / Stream # - new: Stream / if not title: return None if season.casefold() == "film" and episode.casefold() == "stream": pass elif season.casefold() == "stream" and (episode == title or episode.casefold() == "stream"): pass else: return None movie_id = self._ensure_title_id(title) if movie_id is None: return None stream_url = self._watch_stream_url(movie_id) return stream_url or None def resolve_stream_link(self, link: str) -> Optional[str]: try: from resolveurl_backend import resolve as resolve_with_resolveurl except Exception: resolve_with_resolveurl = None if callable(resolve_with_resolveurl): return resolve_with_resolveurl(link) or link return link def capabilities(self) -> Set[str]: return {"new_titles", "genres"} def new_titles(self) -> List[str]: if not REQUESTS_AVAILABLE: return [] if not self._get_base_url(): return [] # Backwards compatible: first page only. UI uses paging via `new_titles_page`. return self.new_titles_page(1) # Alias für die automatische Plugin-Erkennung. Plugin = EinschaltenPlugin