From 58da71572374587dd2657bfa737d61d5e2c51da2 Mon Sep 17 00:00:00 2001 From: "itdrui.de" Date: Wed, 4 Mar 2026 22:29:49 +0100 Subject: [PATCH] =?UTF-8?q?dev:=20bump=20to=200.1.71-dev=20=E2=80=93=20neu?= =?UTF-8?q?e=20Plugins=20(Moflix,=20KKiste,=20HDFilme,=20Netzkino),=20Seri?= =?UTF-8?q?enStream=20A-Z,=20VidHide-Fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- addon/default.py | 22 +- addon/plugins/hdfilme_plugin.py | 289 ++++++++++ addon/plugins/kkiste_plugin.py | 367 +++++++++++++ addon/plugins/moflix_plugin.py | 755 +++++++++++++++++++++++++++ addon/plugins/netzkino_plugin.py | 251 +++++++++ addon/plugins/serienstream_plugin.py | 68 ++- tests/test_moflix_plugin.py | 711 +++++++++++++++++++++++++ 7 files changed, 2460 insertions(+), 3 deletions(-) create mode 100644 addon/plugins/hdfilme_plugin.py create mode 100644 addon/plugins/kkiste_plugin.py create mode 100644 addon/plugins/moflix_plugin.py create mode 100644 addon/plugins/netzkino_plugin.py create mode 100644 tests/test_moflix_plugin.py diff --git a/addon/default.py b/addon/default.py index 9592e1f..8a5cc1e 100644 --- a/addon/default.py +++ b/addon/default.py @@ -4086,7 +4086,16 @@ def _play_episode( 4500, ) return - final_link = resolved_link or link + if not resolved_link: + _log("Stream konnte nicht aufgeloest werden.", xbmc.LOGWARNING) + xbmcgui.Dialog().notification( + "Wiedergabe", + "Stream konnte nicht aufgeloest werden.", + xbmcgui.NOTIFICATION_INFO, + 3000, + ) + return + final_link = resolved_link final_link = normalize_resolved_stream_url(final_link, source_url=link) err = _resolveurl_last_error() if _is_cloudflare_challenge_error(err) and final_link.strip() == link.strip(): @@ -4217,7 +4226,16 @@ def _play_episode_url( 4500, ) return - final_link = resolved_link or link + if not resolved_link: + _log("Stream konnte nicht aufgeloest werden.", xbmc.LOGWARNING) + xbmcgui.Dialog().notification( + "Wiedergabe", + "Stream konnte nicht aufgeloest werden.", + xbmcgui.NOTIFICATION_INFO, + 3000, + ) + return + final_link = resolved_link final_link = normalize_resolved_stream_url(final_link, source_url=link) err = _resolveurl_last_error() if _is_cloudflare_challenge_error(err) and final_link.strip() == link.strip(): diff --git a/addon/plugins/hdfilme_plugin.py b/addon/plugins/hdfilme_plugin.py new file mode 100644 index 0000000..37f916c --- /dev/null +++ b/addon/plugins/hdfilme_plugin.py @@ -0,0 +1,289 @@ +"""HDFilme Plugin für ViewIT. + +HTML-Scraping von hdfilme.garden. +Filme und Serien, Hoster-Auflösung via ResolveURL. + +Hinweis: Die Domain ändert sich gelegentlich – als DOMAIN-Konstante konfigurierbar. +""" + +from __future__ import annotations + +import re +from typing import Any, Callable, List, Optional +from urllib.parse import quote_plus + +try: # pragma: no cover + import requests +except ImportError as exc: # pragma: no cover + requests = None + REQUESTS_AVAILABLE = False + REQUESTS_IMPORT_ERROR = exc +else: + REQUESTS_AVAILABLE = True + REQUESTS_IMPORT_ERROR = None + +from plugin_interface import BasisPlugin + +# --------------------------------------------------------------------------- +# Konstanten +# --------------------------------------------------------------------------- + +DOMAIN = "hdfilme.garden" +BASE_URL = "https://" + DOMAIN +DEFAULT_TIMEOUT = 20 + +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", + "Referer": BASE_URL + "/", +} + +_URL_SEARCH = BASE_URL + "/index.php?do=search&subaction=search&story={query}" +_URL_NEW = BASE_URL + "/kinofilme-online/" +_URL_SERIES = BASE_URL + "/serienstream-deutsch/" + +# HTML-Parsing-Muster +_RE_ENTRIES = re.compile( + r'
]*>([^<]+).*?data-src="([^"]+)', + re.DOTALL, +) +_RE_EPISODES = re.compile(r'>([^<]+)') +_RE_HOSTERS = re.compile(r'link="([^"]+)"') +_RE_THUMB_STANDALONE = re.compile(r'data-src="([^"]+)"') + +_SKIP_HOSTERS = {"youtube", "dropload"} + +ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] + + +# --------------------------------------------------------------------------- +# Plugin-Klasse +# --------------------------------------------------------------------------- + +class HDFilmePlugin(BasisPlugin): + """HDFilme Integration für ViewIT (hdfilme.garden).""" + + name = "HDFilme" + + def __init__(self) -> None: + # title → Detail-Page-URL + self._title_to_url: dict[str, str] = {} + # title → (plot, poster, fanart) + self._title_meta: dict[str, tuple[str, str, str]] = {} + # title → True wenn Serie + self._is_series: dict[str, bool] = {} + + # ------------------------------------------------------------------ + # Verfügbarkeit + # ------------------------------------------------------------------ + + @property + def is_available(self) -> bool: + return REQUESTS_AVAILABLE + + @property + def unavailable_reason(self) -> str: + if REQUESTS_AVAILABLE: + return "" + return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}" + + # ------------------------------------------------------------------ + # HTTP + # ------------------------------------------------------------------ + + def _get_session(self): # type: ignore[return] + from http_session_pool import get_requests_session + return get_requests_session("hdfilme", headers=HEADERS) + + def _get_html(self, url: str) -> str: + session = self._get_session() + response = None + try: + response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) + response.raise_for_status() + return response.text + except Exception: + return "" + finally: + if response is not None: + try: + response.close() + except Exception: + pass + + # ------------------------------------------------------------------ + # Interne Hilfsmethoden + # ------------------------------------------------------------------ + + def _parse_entries(self, html: str) -> List[str]: + """Parst Ergebnisseite und cached Einträge. Gibt Titelliste zurück.""" + titles: list[str] = [] + for m in _RE_ENTRIES.finditer(html): + raw_url, raw_title, raw_thumb = m.group(1), m.group(2), m.group(3) + title = raw_title.strip() + if not title: + continue + + # Absolute URL sicherstellen + url = raw_url.strip() + if url.startswith("/"): + url = BASE_URL + url + if not url.startswith("http"): + continue + + thumb = raw_thumb.strip() + if thumb.startswith("/"): + thumb = BASE_URL + thumb + + is_series = "taffel" in title # "Staffel" (xStream-Konvention) + self._title_to_url[title] = url + self._is_series[title] = is_series + self._title_meta[title] = ("", thumb, "") + titles.append(title) + return titles + + def _get_hoster_links(self, html: str, episode: str = "") -> List[str]: + """Extrahiert Hoster-URLs aus HTML, optional nach Episode gefiltert.""" + search_area = html + if episode: + # Episode-Abschnitt isolieren + m = re.search(re.escape(episode) + r"<.*?", html, re.DOTALL) + if m: + search_area = m.group(0) + + links: list[str] = [] + for m in _RE_HOSTERS.finditer(search_area): + link = m.group(1).strip() + if not link: + continue + if link.startswith("//"): + link = "https:" + link + name = link.split("//")[-1].split(".")[0].lower() + if name in _SKIP_HOSTERS: + continue + links.append(link) + return links + + # ------------------------------------------------------------------ + # Pflicht-Methoden + # ------------------------------------------------------------------ + + async def search_titles( + self, query: str, progress_callback: ProgressCallback = None + ) -> List[str]: + query = (query or "").strip() + if not query or not REQUESTS_AVAILABLE: + return [] + url = _URL_SEARCH.format(query=quote_plus(query)) + html = self._get_html(url) + if not html: + return [] + # Suche filtert clientseitig nach Titel + q_lower = query.lower() + all_titles = self._parse_entries(html) + return [t for t in all_titles if q_lower in t.lower()] + + def seasons_for(self, title: str) -> List[str]: + title = (title or "").strip() + if not title: + return [] + if self._is_series.get(title): + # Staffelnummer aus Titel ableiten, falls vorhanden + m = re.search(r"Staffel\s*(\d+)", title, re.IGNORECASE) + if m: + return [f"Staffel {m.group(1)}"] + return ["Staffel 1"] + return ["Film"] + + def episodes_for(self, title: str, season: str) -> List[str]: + title = (title or "").strip() + if not title: + return [] + + if season == "Film": + return [title] + + url = self._title_to_url.get(title, "") + if not url: + return [] + + html = self._get_html(url) + if not html: + return [title] + + episodes = _RE_EPISODES.findall(html) + return [ep.strip() for ep in episodes if ep.strip()] or [title] + + # ------------------------------------------------------------------ + # Stream + # ------------------------------------------------------------------ + + def stream_link_for( + self, title: str, season: str, episode: str + ) -> Optional[str]: + title = (title or "").strip() + url = self._title_to_url.get(title, "") + if not url: + return None + + html = self._get_html(url) + if not html: + return None + + # Für Serien: nach Episode-Abschnitt filtern (wenn episode != title) + ep_filter = "" if (season == "Film" or episode == title) else episode + links = self._get_hoster_links(html, ep_filter) + return links[0] if links else None + + def resolve_stream_link(self, link: str) -> Optional[str]: + link = (link or "").strip() + if not link: + return None + try: + from plugin_helpers import resolve_via_resolveurl + return resolve_via_resolveurl(link, fallback_to_link=False) + except Exception: + return None + + # ------------------------------------------------------------------ + # Metadaten + # ------------------------------------------------------------------ + + def metadata_for( + self, title: str + ) -> tuple[dict[str, str], dict[str, str], list | None]: + title = (title or "").strip() + if not title: + return {}, {}, None + + info: dict[str, str] = {"title": title} + art: dict[str, str] = {} + + cached = self._title_meta.get(title) + if cached: + plot, poster, fanart = cached + if plot: + info["plot"] = plot + if poster: + art["thumb"] = poster + art["poster"] = poster + if fanart: + art["fanart"] = fanart + + return info, art, None + + # ------------------------------------------------------------------ + # Browsing + # ------------------------------------------------------------------ + + def latest_titles(self, page: int = 1) -> List[str]: + html = self._get_html(_URL_NEW) + return self._parse_entries(html) if html else [] + + def popular_series(self) -> List[str]: + html = self._get_html(_URL_SERIES) + return self._parse_entries(html) if html else [] + + def capabilities(self) -> set[str]: + return {"latest_titles", "popular_series"} diff --git a/addon/plugins/kkiste_plugin.py b/addon/plugins/kkiste_plugin.py new file mode 100644 index 0000000..fd7a7e6 --- /dev/null +++ b/addon/plugins/kkiste_plugin.py @@ -0,0 +1,367 @@ +"""KKiste Plugin für ViewIT. + +Nutzt die JSON-REST-API von kkiste.eu. +Filme und Serien mit TMDB-Thumbnails – kein HTML-Scraping. + +Serien-Besonderheit: Auf KKiste ist jede Staffel ein eigener Eintrag +(z.B. "Breaking Bad - Staffel 1"). Die Suche liefert alle passenden +Staffel-Einträge direkt. +""" + +from __future__ import annotations + +import re +from typing import Any, Callable, List, Optional +from urllib.parse import quote_plus + +try: # pragma: no cover + import requests +except ImportError as exc: # pragma: no cover + requests = None + REQUESTS_AVAILABLE = False + REQUESTS_IMPORT_ERROR = exc +else: + REQUESTS_AVAILABLE = True + REQUESTS_IMPORT_ERROR = None + +from plugin_interface import BasisPlugin + +# --------------------------------------------------------------------------- +# Konstanten +# --------------------------------------------------------------------------- + +DOMAIN = "kkiste.eu" +BASE_URL = "https://" + DOMAIN +DEFAULT_TIMEOUT = 20 + +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", + "Referer": BASE_URL + "/", + "Origin": BASE_URL, +} + +# Sprache: 2=Deutsch, 3=Englisch, all=alle +_LANG = "2" +_THUMB_BASE = "https://image.tmdb.org/t/p/w300" + +_URL_BROWSE = BASE_URL + "/data/browse/?lang={lang}&type={type}&order_by={order}&page={page}" +_URL_SEARCH = BASE_URL + "/data/browse/?lang={lang}&order_by=new&page=1&limit=0" +_URL_GENRE = BASE_URL + "/data/browse/?lang={lang}&type=movies&order_by=Trending&genre={genre}&page=1" +_URL_WATCH = BASE_URL + "/data/watch/?_id={id}" + +GENRE_SLUGS: dict[str, str] = { + "Action": "Action", + "Animation": "Animation", + "Biographie": "Biographie", + "Dokumentation": "Dokumentation", + "Drama": "Drama", + "Familie": "Familie", + "Fantasy": "Fantasy", + "Horror": "Horror", + "Komödie": "Komödie", + "Krimi": "Krimi", + "Mystery": "Mystery", + "Romantik": "Romantik", + "Science-Fiction": "Sci-Fi", + "Thriller": "Thriller", + "Western": "Western", +} + +ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] + + +# --------------------------------------------------------------------------- +# Plugin-Klasse +# --------------------------------------------------------------------------- + +class KKistePlugin(BasisPlugin): + """KKiste Integration für ViewIT (kkiste.eu). + + Jede Staffel einer Serie ist auf KKiste ein eigenständiger API-Eintrag. + """ + + name = "KKiste" + + def __init__(self) -> None: + # title → watch-URL (/data/watch/?_id=X) + self._title_to_watch_url: dict[str, str] = {} + # title → (plot, poster, fanart) + self._title_meta: dict[str, tuple[str, str, str]] = {} + # title → True wenn "Staffel"/"Season" im Titel + self._is_series: dict[str, bool] = {} + # title → Staffelnummer (aus "Staffel N" extrahiert) + self._season_nr: dict[str, int] = {} + + # ------------------------------------------------------------------ + # Verfügbarkeit + # ------------------------------------------------------------------ + + @property + def is_available(self) -> bool: + return REQUESTS_AVAILABLE + + @property + def unavailable_reason(self) -> str: + if REQUESTS_AVAILABLE: + return "" + return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}" + + # ------------------------------------------------------------------ + # HTTP + # ------------------------------------------------------------------ + + def _get_session(self): # type: ignore[return] + from http_session_pool import get_requests_session + return get_requests_session("kkiste", headers=HEADERS) + + def _get_json(self, url: str) -> dict | list | None: + session = self._get_session() + response = None + try: + response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) + response.raise_for_status() + return response.json() + except Exception: + return None + finally: + if response is not None: + try: + response.close() + except Exception: + pass + + # ------------------------------------------------------------------ + # Interne Hilfsmethoden + # ------------------------------------------------------------------ + + def _cache_entry(self, movie: dict) -> str: + """Cached einen API-Eintrag und gibt den Titel zurück ('' = überspringen).""" + title = str(movie.get("title") or "").strip() + if not title or "_id" not in movie: + return "" + + movie_id = str(movie["_id"]) + self._title_to_watch_url[title] = _URL_WATCH.format(id=movie_id) + + # Serie erkennen + is_series = "Staffel" in title or "Season" in title + self._is_series[title] = is_series + if is_series: + m = re.search(r"(?:Staffel|Season)\s*(\d+)", title, re.IGNORECASE) + if m: + self._season_nr[title] = int(m.group(1)) + + # Metadaten + poster = "" + for key in ("poster_path_season", "poster_path"): + if movie.get(key): + poster = _THUMB_BASE + str(movie[key]) + break + fanart = _THUMB_BASE + str(movie["backdrop_path"]) if movie.get("backdrop_path") else "" + plot = str(movie.get("storyline") or movie.get("overview") or "") + + self._title_meta[title] = (plot, poster, fanart) + return title + + def _browse(self, content_type: str, order: str = "Trending") -> List[str]: + url = _URL_BROWSE.format(lang=_LANG, type=content_type, order=order, page=1) + data = self._get_json(url) + if not isinstance(data, dict): + return [] + return [ + t for movie in (data.get("movies") or []) + if isinstance(movie, dict) and (t := self._cache_entry(movie)) + ] + + # ------------------------------------------------------------------ + # Pflicht-Methoden + # ------------------------------------------------------------------ + + async def search_titles( + self, query: str, progress_callback: ProgressCallback = None + ) -> List[str]: + query = (query or "").strip() + if not query or not REQUESTS_AVAILABLE: + return [] + + # KKiste: limit=0 lädt alle Titel, client-seitige Filterung + url = _URL_SEARCH.format(lang=_LANG) + data = self._get_json(url) + if not isinstance(data, dict): + return [] + + q_lower = query.lower() + titles: list[str] = [] + for movie in (data.get("movies") or []): + if not isinstance(movie, dict) or "_id" not in movie: + continue + raw_title = str(movie.get("title") or "").strip() + if not raw_title or q_lower not in raw_title.lower(): + continue + t = self._cache_entry(movie) + if t: + titles.append(t) + return titles + + def seasons_for(self, title: str) -> List[str]: + title = (title or "").strip() + if not title: + return [] + + is_series = self._is_series.get(title) + if is_series: + season_nr = self._season_nr.get(title, 1) + return [f"Staffel {season_nr}"] + # Film (oder unbekannt → Film-Fallback) + return ["Film"] + + def episodes_for(self, title: str, season: str) -> List[str]: + title = (title or "").strip() + if not title: + return [] + + # Film + if season == "Film": + return [title] + + # Serie: Episodenliste aus /data/watch/ laden + watch_url = self._title_to_watch_url.get(title, "") + if not watch_url: + return [] + + data = self._get_json(watch_url) + if not isinstance(data, dict): + return [] + + episode_nrs: set[int] = set() + for stream in (data.get("streams") or []): + if not isinstance(stream, dict): + continue + e = stream.get("e") + if e is not None: + try: + episode_nrs.add(int(e)) + except (ValueError, TypeError): + pass + + if not episode_nrs: + # Keine Episoden-Nummern → als Film behandeln + return [title] + + return [f"Episode {nr}" for nr in sorted(episode_nrs)] + + # ------------------------------------------------------------------ + # Stream + # ------------------------------------------------------------------ + + def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: + title = (title or "").strip() + watch_url = self._title_to_watch_url.get(title, "") + if not watch_url: + return None + + data = self._get_json(watch_url) + if not isinstance(data, dict): + return None + + streams = data.get("streams") or [] + + if season == "Film": + # Film: Stream ohne Episode-Nummer bevorzugen + for stream in streams: + if isinstance(stream, dict) and stream.get("e") is None: + src = str(stream.get("stream") or "").strip() + if src: + return src + # Fallback: irgendeinen Stream + for stream in streams: + if isinstance(stream, dict): + src = str(stream.get("stream") or "").strip() + if src: + return src + else: + # Serie: Episodennummer extrahieren und matchen + m = re.search(r"\d+", episode or "") + if not m: + return None + ep_nr = int(m.group()) + for stream in streams: + if not isinstance(stream, dict): + continue + try: + if int(stream.get("e") or -1) == ep_nr: + src = str(stream.get("stream") or "").strip() + if src: + return src + except (ValueError, TypeError): + pass + return None + + def resolve_stream_link(self, link: str) -> Optional[str]: + link = (link or "").strip() + if not link: + return None + try: + from plugin_helpers import resolve_via_resolveurl + return resolve_via_resolveurl(link, fallback_to_link=False) + except Exception: + return None + + # ------------------------------------------------------------------ + # Metadaten + # ------------------------------------------------------------------ + + def metadata_for( + self, title: str + ) -> tuple[dict[str, str], dict[str, str], list | None]: + title = (title or "").strip() + if not title: + return {}, {}, None + + info: dict[str, str] = {"title": title} + art: dict[str, str] = {} + + cached = self._title_meta.get(title) + if cached: + plot, poster, fanart = cached + if plot: + info["plot"] = plot + if poster: + art["thumb"] = poster + art["poster"] = poster + if fanart: + art["fanart"] = fanart + art["landscape"] = fanart + + return info, art, None + + # ------------------------------------------------------------------ + # Browsing + # ------------------------------------------------------------------ + + def popular_series(self) -> List[str]: + return self._browse("tvseries", "views") + + def latest_titles(self, page: int = 1) -> List[str]: + return self._browse("movies", "new") + + def genres(self) -> List[str]: + return sorted(GENRE_SLUGS.keys()) + + def titles_for_genre(self, genre: str) -> List[str]: + slug = GENRE_SLUGS.get(genre, "") + if not slug: + return [] + url = _URL_GENRE.format(lang=_LANG, genre=quote_plus(slug)) + data = self._get_json(url) + if not isinstance(data, dict): + return [] + return [ + t for movie in (data.get("movies") or []) + if isinstance(movie, dict) and (t := self._cache_entry(movie)) + ] + + def capabilities(self) -> set[str]: + return {"popular_series", "latest_titles", "genres"} diff --git a/addon/plugins/moflix_plugin.py b/addon/plugins/moflix_plugin.py new file mode 100644 index 0000000..7ca7255 --- /dev/null +++ b/addon/plugins/moflix_plugin.py @@ -0,0 +1,755 @@ +"""Moflix-Stream Plugin für ViewIT. + +Nutzt die JSON-REST-API von moflix-stream.xyz. +Kein HTML-Parsing nötig – alle Daten kommen als JSON. +""" + +from __future__ import annotations + +import re +from typing import TYPE_CHECKING, Any, Callable, List, Optional +from urllib.parse import quote, quote_plus, urlparse + +try: # pragma: no cover - optional dependency + import requests +except ImportError as exc: # pragma: no cover + requests = None + REQUESTS_AVAILABLE = False + REQUESTS_IMPORT_ERROR = exc +else: + REQUESTS_AVAILABLE = True + REQUESTS_IMPORT_ERROR = None + +from plugin_interface import BasisPlugin + +if TYPE_CHECKING: # pragma: no cover + from requests import Session as RequestsSession +else: # pragma: no cover + RequestsSession = Any + +ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] + +# --------------------------------------------------------------------------- +# Konstanten +# --------------------------------------------------------------------------- + +ADDON_ID = "plugin.video.viewit" +BASE_URL = "https://moflix-stream.xyz" +DEFAULT_TIMEOUT = 20 + +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", + "Connection": "keep-alive", + "Referer": BASE_URL + "/", +} + +# Separate Header-Definition für VidHide-Requests (moflix-stream.click) +# Separater Browser-UA verhindert UA-basierte Blockierung durch VidHide +_VIDHIDE_HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", + "Connection": "keep-alive", + "Referer": BASE_URL + "/", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "cross-site", +} + +# Hoster-Domains, die erfahrungsgemäß 403 liefern oder kein ResolveURL-Support haben +_VIDEO_SKIP_DOMAINS: frozenset[str] = frozenset({ + "gupload.xyz", + "veev.to", +}) + +# Hoster-Domains, die direkt über eine eigene API auflösbar sind (bevorzugen) +_VIDEO_PREFER_DOMAINS: frozenset[str] = frozenset({ + "vidara.to", +}) + +_URL_SEARCH = BASE_URL + "/api/v1/search/{q1}?query={q2}&limit=8" +_URL_CHANNEL = BASE_URL + "/api/v1/channel/{slug}?channelType=channel&restriction=&paginate=simple" +_URL_TITLE = ( + BASE_URL + "/api/v1/titles/{id}" + "?load=images,genres,productionCountries,keywords,videos,primaryVideo,seasons,compactCredits" +) +_URL_EPISODES = BASE_URL + "/api/v1/titles/{id}/seasons/{s}/episodes?perPage=100&query=&page=1" +_URL_EPISODE = ( + BASE_URL + "/api/v1/titles/{id}/seasons/{s}/episodes/{e}" + "?load=videos,compactCredits,primaryVideo" +) + +# Genre-Slugs (hardcodiert, da keine Genre-API vorhanden) +GENRE_SLUGS: dict[str, str] = { + "Action": "action", + "Animation": "animation", + "Dokumentation": "dokumentation", + "Drama": "drama", + "Familie": "top-kids-liste", + "Fantasy": "fantasy", + "Horror": "horror", + "Komödie": "comedy", + "Krimi": "crime", + "Liebesfilm": "romance", + "Science-Fiction": "science-fiction", + "Thriller": "thriller", +} + +# Collections (Slugs aus dem offiziellen xStream-Plugin) +COLLECTION_SLUGS: dict[str, str] = { + "American Pie Complete Collection": "the-american-pie-collection", + "Bud Spencer & Terence Hill": "bud-spencer-terence-hill-collection", + "DC Superhelden Collection": "the-dc-universum-collection", + "Mission: Impossible Collection": "the-mission-impossible-collection", + "Fast & Furious Collection": "fast-furious-movie-collection", + "Halloween Collection": "halloween-movie-collection", + "Herr der Ringe Collection": "der-herr-der-ringe-collection", + "James Bond Collection": "the-james-bond-collection", + "Jason Bourne Collection": "the-jason-bourne-collection", + "Jurassic Park Collection": "the-jurassic-park-collection", + "Kinder & Familienfilme": "top-kids-liste", + "Marvel Cinematic Universe": "the-marvel-cinematic-universe-collection", + "Olsenbande Collection": "the-olsenbande-collection", + "Planet der Affen Collection": "the-planet-der-affen-collection", + "Rocky Collection": "rocky-the-knockout-collection", + "Star Trek Kinofilm Collection": "the-star-trek-movies-collection", + "Star Wars Collection": "the-star-wars-collection", + "Stirb Langsam Collection": "stirb-langsam-collection", + "X-Men Collection": "x-men-collection", +} + + +# --------------------------------------------------------------------------- +# Hilfsfunktionen (Modul-Ebene) +# --------------------------------------------------------------------------- + +def _extract_first_number(label: str) -> int | None: + """Extrahiert erste Ganzzahl aus einem Label. 'Staffel 2' → 2.""" + m = re.search(r"\d+", label or "") + return int(m.group()) if m else None + + +def _normalize_video_name(name: str, src: str) -> str: + """Normalisiert den Hoster-Namen eines Video-Objekts. + + 'Mirror-HDCloud' → Domain aus src; 'VidCloud-720' → 'VidCloud' + """ + name = (name or "").strip() + if name.lower().startswith("mirror"): + parsed = urlparse(src or "") + host = parsed.netloc or "" + return host.split(".")[0].capitalize() if host else name + return name.split("-")[0].strip() or name + + +def _safe_str(value: object) -> str: + """Konvertiert einen Wert sicher zu String, None → ''.""" + if value is None: + return "" + return str(value).strip() + + +def _unpack_packer(packed_js: str) -> str: + """Entpackt Dean Edwards p.a.c.k.e.r. JavaScript. + + Format: + eval(function(p,a,c,k,e,d){...}('code',base,count,'k1|k2|...'.split('|'),0,0)) + + Findet die gepackte Zeichenkette, die Basis und den Schlüssel-String, + konvertiert jeden Token (base-N → Index) und ersetzt ihn durch das + jeweilige Schlüsselwort. + """ + m = re.search( + r"'((?:[^'\\]|\\.){20,})'\s*,\s*(\d+)\s*,\s*\d+\s*,\s*" + r"'((?:[^'\\]|\\.)*)'\s*\.split\s*\(\s*'\|'\s*\)", + packed_js, + ) + if not m: + return packed_js + + packed = m.group(1).replace("\\'", "'").replace("\\\\", "\\") + base = int(m.group(2)) + keys = m.group(3).split("|") + + _digits = "0123456789abcdefghijklmnopqrstuvwxyz" + + def _unbase(s: str) -> int: + result = 0 + for ch in s: + if ch not in _digits: + raise ValueError(f"Not a base-{base} digit: {ch!r}") + result = result * base + _digits.index(ch) + return result + + def _replace(m2: re.Match) -> str: # type: ignore[type-arg] + token = m2.group(0) + try: + idx = _unbase(token) + replacement = keys[idx] if idx < len(keys) else "" + return replacement if replacement else token + except (ValueError, IndexError): + return token + + return re.sub(r"\b\w+\b", _replace, packed) + + +# --------------------------------------------------------------------------- +# Plugin-Klasse +# --------------------------------------------------------------------------- + +class MoflixPlugin(BasisPlugin): + """Moflix-Stream Integration für ViewIT. + + Verwendet die offizielle JSON-REST-API – kein HTML-Scraping. + """ + + name = "Moflix" + + def __init__(self) -> None: + # title (str) → vollständige API-URL /api/v1/titles/{id} + self._title_to_url: dict[str, str] = {} + # title → (plot, poster_url, fanart_url) + self._title_meta: dict[str, tuple[str, str, str]] = {} + # title → True wenn Serie, False wenn Film + self._is_series: dict[str, bool] = {} + # (title, season_nr) → Moflix-API-ID (ändert sich pro Staffel!) + self._season_api_ids: dict[tuple[str, int], str] = {} + # (title, season_nr) → Liste der Episode-Labels + self._episode_labels: dict[tuple[str, int], list[str]] = {} + + # ------------------------------------------------------------------ + # Verfügbarkeit + # ------------------------------------------------------------------ + + @property + def is_available(self) -> bool: + return REQUESTS_AVAILABLE + + @property + def unavailable_reason(self) -> str: + if REQUESTS_AVAILABLE: + return "" + return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}" + + # ------------------------------------------------------------------ + # HTTP + # ------------------------------------------------------------------ + + def _get_session(self) -> RequestsSession: + from http_session_pool import get_requests_session + return get_requests_session("moflix", headers=HEADERS) + + def _get_json(self, url: str, headers: dict | None = None) -> dict | list | None: + """GET-Request, gibt geparste JSON-Antwort zurück oder None bei Fehler.""" + session = self._get_session() + response = None + try: + response = session.get(url, headers=headers or HEADERS, timeout=DEFAULT_TIMEOUT) + response.raise_for_status() + return response.json() + except Exception: + return None + finally: + if response is not None: + try: + response.close() + except Exception: + pass + + def _get_html( + self, + url: str, + headers: dict | None = None, + fresh_session: bool = False, + ) -> str | None: + """GET-Request, gibt den Response-Text (HTML) zurück oder None bei Fehler. + + fresh_session=True: eigene requests.Session (keine gecachten Cookies/State). + """ + response = None + try: + if fresh_session: + import requests as _req + session = _req.Session() + else: + session = self._get_session() + + req_headers = headers or { + **HEADERS, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + } + response = session.get(url, headers=req_headers, timeout=DEFAULT_TIMEOUT) + response.raise_for_status() + return response.text + except Exception: + return None + finally: + if response is not None: + try: + response.close() + except Exception: + pass + + # ------------------------------------------------------------------ + # Interne Hilfsmethoden + # ------------------------------------------------------------------ + + def _cache_channel_entry(self, entry: dict) -> str: + """Cached einen Kanal/Sucheintrag und gibt den Titel zurück (oder '' zum Überspringen).""" + title = _safe_str(entry.get("name")) + if not title: + return "" + api_id = _safe_str(entry.get("id")) + if not api_id: + return "" + + self._title_to_url[title] = _URL_TITLE.format(id=api_id) + is_series = bool(entry.get("is_series", False)) + self._is_series[title] = is_series + + plot = _safe_str(entry.get("description")) + poster = _safe_str(entry.get("poster")) + fanart = _safe_str(entry.get("backdrop")) + self._title_meta[title] = (plot, poster, fanart) + return title + + def _titles_from_channel(self, slug: str, page: int = 1) -> list[str]: + """Lädt Titel eines Moflix-Channels (Kategorie/Genre/Collection).""" + url = _URL_CHANNEL.format(slug=slug) + if page > 1: + url = f"{url}&page={page}" + data = self._get_json(url) + if not isinstance(data, dict): + return [] + entries = [] + try: + entries = data["channel"]["content"]["data"] + except (KeyError, TypeError): + return [] + titles: list[str] = [] + for entry in (entries or []): + if not isinstance(entry, dict): + continue + t = self._cache_channel_entry(entry) + if t: + titles.append(t) + return titles + + def _ensure_title_url(self, title: str) -> str: + """Gibt die gecachte API-URL für einen Titel zurück, oder ''.""" + return self._title_to_url.get(title, "") + + def _resolve_title(self, title: str) -> None: + """Cache-Miss-Fallback: Titel per Such-API nachschlagen und cachen. + + Wird aufgerufen wenn der In-Memory-Cache leer ist (z.B. nach einem + neuen Kodi-Addon-Aufruf, der eine frische Plugin-Instanz erzeugt). + """ + q1 = quote(title) + q2 = quote_plus(title) + url = _URL_SEARCH.format(q1=q1, q2=q2) + data = self._get_json(url) + if not isinstance(data, dict): + return + for entry in (data.get("results") or []): + if not isinstance(entry, dict): + continue + if _safe_str(entry.get("name")) == title: + self._cache_channel_entry(entry) + return + + # ------------------------------------------------------------------ + # Pflicht-Methoden + # ------------------------------------------------------------------ + + async def search_titles( + self, + query: str, + progress_callback: ProgressCallback = None, + ) -> List[str]: + query = (query or "").strip() + if not query or not REQUESTS_AVAILABLE: + return [] + + q1 = quote(query) + q2 = quote_plus(query) + url = _URL_SEARCH.format(q1=q1, q2=q2) + data = self._get_json(url) + if not isinstance(data, dict): + return [] + + results = data.get("results") or [] + titles: list[str] = [] + for entry in results: + if not isinstance(entry, dict): + continue + # Personen überspringen + if "person" in _safe_str(entry.get("model_type")): + continue + t = self._cache_channel_entry(entry) + if t: + titles.append(t) + return titles + + def seasons_for(self, title: str) -> List[str]: + title = (title or "").strip() + if not title: + return [] + + # Film: direkt zum Stream + if self._is_series.get(title) is False: + return ["Film"] + + url = self._ensure_title_url(title) + if not url: + self._resolve_title(title) + url = self._ensure_title_url(title) + if not url: + return [] + + data = self._get_json(url) + if not isinstance(data, dict): + return [] + + seasons_raw = [] + try: + seasons_raw = data["seasons"]["data"] + except (KeyError, TypeError): + pass + + if not seasons_raw: + # Kein Staffel-Daten → Film-Fallback + return ["Film"] + + # Nach Staffelnummer sortieren + seasons_raw = sorted(seasons_raw, key=lambda s: int(s.get("number", 0) or 0)) + + labels: list[str] = [] + for season in seasons_raw: + if not isinstance(season, dict): + continue + nr = season.get("number") + api_id = _safe_str(season.get("title_id")) + if nr is None or not api_id: + continue + try: + season_nr = int(nr) + except (ValueError, TypeError): + continue + self._season_api_ids[(title, season_nr)] = api_id + labels.append(f"Staffel {season_nr}") + + return labels + + def episodes_for(self, title: str, season: str) -> List[str]: + title = (title or "").strip() + season = (season or "").strip() + if not title or not season: + return [] + + # Film: Episode = Titel selbst + if season == "Film": + return [title] + + season_nr = _extract_first_number(season) + if season_nr is None: + return [] + + # Cache-Hit + cached = self._episode_labels.get((title, season_nr)) + if cached is not None: + return cached + + api_id = self._season_api_ids.get((title, season_nr), "") + if not api_id: + # Staffeln nachladen falls noch nicht gecacht + self.seasons_for(title) + api_id = self._season_api_ids.get((title, season_nr), "") + if not api_id: + return [] + + url = _URL_EPISODES.format(id=api_id, s=season_nr) + data = self._get_json(url) + if not isinstance(data, dict): + return [] + + episodes_raw = [] + try: + episodes_raw = data["pagination"]["data"] + except (KeyError, TypeError): + pass + + labels: list[str] = [] + for ep in (episodes_raw or []): + if not isinstance(ep, dict): + continue + # Episoden ohne Video überspringen + if ep.get("primary_video") is None: + continue + ep_nr_raw = ep.get("episode_number") + ep_name = _safe_str(ep.get("name")) + try: + ep_nr = int(ep_nr_raw or 0) + except (ValueError, TypeError): + continue + if ep_nr <= 0: + continue + + label = f"Episode {ep_nr}" + if ep_name: + label = f"{label} – {ep_name}" + + labels.append(label) + + self._episode_labels[(title, season_nr)] = labels + return labels + + # ------------------------------------------------------------------ + # Stream + # ------------------------------------------------------------------ + + def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: + title = (title or "").strip() + season = (season or "").strip() + + if season == "Film": + return self._stream_link_for_movie(title) + + season_nr = _extract_first_number(season) + episode_nr = _extract_first_number(episode) + if season_nr is None or episode_nr is None: + return None + + # Season-API-ID ermitteln (mit Cache-Miss-Fallback) + api_id = self._season_api_ids.get((title, season_nr), "") + if not api_id: + self.seasons_for(title) + api_id = self._season_api_ids.get((title, season_nr), "") + if not api_id: + return None + + # Episoden-Detail laden – enthält videos[] mit src-URLs + url = _URL_EPISODE.format(id=api_id, s=season_nr, e=episode_nr) + data = self._get_json(url) + if not isinstance(data, dict): + return None + videos = (data.get("episode") or {}).get("videos") or [] + return self._best_src_from_videos(videos) + + def _stream_link_for_movie(self, title: str) -> Optional[str]: + """Wählt den besten src-Link eines Films aus den API-Videos.""" + url = self._ensure_title_url(title) + if not url: + self._resolve_title(title) + url = self._ensure_title_url(title) + if not url: + return None + data = self._get_json(url) + if not isinstance(data, dict): + return None + videos = (data.get("title") or {}).get("videos") or [] + return self._best_src_from_videos(videos) + + def _best_src_from_videos(self, videos: object) -> Optional[str]: + """Wählt die beste src-URL aus einer videos[]-Liste. + + Priorisiert bekannte auflösbare Hoster (vidara.to), + überspringt Domains die erfahrungsgemäß 403 liefern. + """ + preferred: list[str] = [] + fallback: list[str] = [] + for v in (videos if isinstance(videos, list) else []): + if not isinstance(v, dict): + continue + src = _safe_str(v.get("src")) + if not src or "youtube" in src.lower(): + continue + domain = urlparse(src).netloc.lstrip("www.") + if domain in _VIDEO_SKIP_DOMAINS: + continue + if domain in _VIDEO_PREFER_DOMAINS: + preferred.append(src) + else: + fallback.append(src) + candidates = preferred + fallback + return candidates[0] if candidates else None + + def _resolve_vidara(self, filecode: str) -> Optional[str]: + """Löst einen vidara.to-Filecode über die vidara-API auf → HLS-URL.""" + api_url = f"https://vidara.to/api/stream?filecode={filecode}" + vidara_headers = { + **HEADERS, + "Referer": f"https://vidara.to/e/{filecode}", + "Origin": "https://vidara.to", + } + data = self._get_json(api_url, headers=vidara_headers) + if not isinstance(data, dict): + return None + return _safe_str(data.get("streaming_url")) or None + + def _resolve_vidhide(self, embed_url: str) -> Optional[str]: + """Löst einen VidHide-Embed-Link (moflix-stream.click) auf → HLS-URL. + + Verwendet eine frische Session mit echtem Chrome-UA um UA-basierte + Blockierungen zu umgehen. Entpackt p.a.c.k.e.r.-JS und extrahiert + den HLS-Stream aus links.hls4/hls3/hls2. + """ + # Frische Session (NICHT die gecachte "moflix"-Session) mit VidHide-Headers + html = self._get_html(embed_url, headers=_VIDHIDE_HEADERS, fresh_session=True) + if not html or "eval(function(p,a,c,k,e" not in html: + return None + + unpacked = _unpack_packer(html) + + # Priorität: hls4 > hls3 > hls2 + for hls_key in ("hls4", "hls3", "hls2"): + m = re.search(rf'"{hls_key}"\s*:\s*"(https://[^"]+)"', unpacked) + if m: + url = m.group(1) + if url: + # Kodi braucht Referer + UA als Header-Suffix damit der CDN die HLS-URL akzeptiert + from urllib.parse import urlencode + headers = urlencode({ + "Referer": embed_url, + "User-Agent": _VIDHIDE_HEADERS["User-Agent"], + }) + return f"{url}|{headers}" + return None + + def resolve_stream_link(self, link: str) -> Optional[str]: + link = (link or "").strip() + if not link: + return None + + # vidara.to: direkt über eigene API auflösen + vidara_m = re.search(r'vidara\.to/e/([A-Za-z0-9_-]+)', link) + if vidara_m: + resolved = self._resolve_vidara(vidara_m.group(1)) + if resolved: + return resolved + + # VidHide (moflix-stream.click): zuerst ResolveURL probieren (FileLions-Modul + # nutzt Kodis libcurl mit anderem TLS-Fingerprint), dann eigenen Resolver + if "moflix-stream.click" in link: + try: + from plugin_helpers import resolve_via_resolveurl + resolved = resolve_via_resolveurl(link, fallback_to_link=False) + if resolved: + return resolved + except Exception: + pass + # Fallback: eigener p.a.c.k.e.r. Resolver + resolved = self._resolve_vidhide(link) + if resolved: + return resolved + return None + + # Fallback: ResolveURL (ohne Link-Fallback – lieber None als unauflösbaren Link) + try: + from plugin_helpers import resolve_via_resolveurl + return resolve_via_resolveurl(link, fallback_to_link=False) + except Exception: + return None + + # ------------------------------------------------------------------ + # Metadaten + # ------------------------------------------------------------------ + + def metadata_for( + self, title: str + ) -> tuple[dict[str, str], dict[str, str], list[object] | None]: + title = (title or "").strip() + if not title: + return {}, {}, None + + info: dict[str, str] = {"title": title} + art: dict[str, str] = {} + + # Cache-Hit + cached = self._title_meta.get(title) + if cached: + plot, poster, fanart = cached + if plot: + info["plot"] = plot + if poster: + art["thumb"] = poster + art["poster"] = poster + if fanart: + art["fanart"] = fanart + art["landscape"] = fanart + if "plot" in info or art: + return info, art, None + + # API-Abruf + url = self._ensure_title_url(title) + if not url: + return info, art, None + + data = self._get_json(url) + if not isinstance(data, dict): + return info, art, None + + title_obj = data.get("title") or {} + plot = _safe_str(title_obj.get("description")) + poster = _safe_str(title_obj.get("poster")) + fanart = _safe_str(title_obj.get("backdrop")) + rating_raw = title_obj.get("rating") + year_raw = _safe_str(title_obj.get("release_date")) + + if plot: + info["plot"] = plot + if rating_raw is not None: + try: + info["rating"] = str(float(rating_raw)) + except (ValueError, TypeError): + pass + if year_raw and len(year_raw) >= 4: + info["year"] = year_raw[:4] + + if poster: + art["thumb"] = poster + art["poster"] = poster + if fanart: + art["fanart"] = fanart + art["landscape"] = fanart + + # Cachen + self._title_meta[title] = (plot, poster, fanart) + + return info, art, None + + # ------------------------------------------------------------------ + # Browsing-Features + # ------------------------------------------------------------------ + + def popular_series(self) -> List[str]: + return self._titles_from_channel("series") + + def latest_titles(self, page: int = 1) -> List[str]: + return self._titles_from_channel("now-playing", page=page) + + def genres(self) -> List[str]: + return sorted(GENRE_SLUGS.keys()) + + def titles_for_genre(self, genre: str) -> List[str]: + return self.titles_for_genre_page(genre, 1) + + def titles_for_genre_page(self, genre: str, page: int = 1) -> List[str]: + slug = GENRE_SLUGS.get(genre, "") + if not slug: + return [] + return self._titles_from_channel(slug, page=page) + + def collections(self) -> List[str]: + return sorted(COLLECTION_SLUGS.keys()) + + def titles_for_collection(self, collection: str, page: int = 1) -> List[str]: + slug = COLLECTION_SLUGS.get(collection, "") + if not slug: + return [] + return self._titles_from_channel(slug, page=page) + + def capabilities(self) -> set[str]: + return {"popular_series", "latest_titles", "collections", "genres"} diff --git a/addon/plugins/netzkino_plugin.py b/addon/plugins/netzkino_plugin.py new file mode 100644 index 0000000..f2639e3 --- /dev/null +++ b/addon/plugins/netzkino_plugin.py @@ -0,0 +1,251 @@ +"""NetzkKino Plugin für ViewIT. + +Nutzt die öffentliche JSON-API von Netzkino. +Nur Filme, keine Serien. Direkte MP4-Streams – kein ResolveURL nötig. +Legal und kostenlos. +""" + +from __future__ import annotations + +from typing import Any, Callable, List, Optional + +try: # pragma: no cover + import requests +except ImportError as exc: # pragma: no cover + requests = None + REQUESTS_AVAILABLE = False + REQUESTS_IMPORT_ERROR = exc +else: + REQUESTS_AVAILABLE = True + REQUESTS_IMPORT_ERROR = None + +from plugin_interface import BasisPlugin + +# --------------------------------------------------------------------------- +# Konstanten +# --------------------------------------------------------------------------- + +DEFAULT_TIMEOUT = 20 + +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", +} + +_API_BASE = "https://api.netzkino.de.simplecache.net/capi-2.0a" +_STREAM_BASE = "https://pmd.netzkino-seite.netzkino.de" + +_URL_SEARCH = _API_BASE + "/search?q={query}&d=www&l=de-DE" +_URL_CATEGORY = _API_BASE + "/categories/{slug}.json?d=www&l=de-DE" + +# Slug → Anzeigename +CATEGORIES: dict[str, str] = { + "highlights": "Highlights", + "neue-filme": "Neue Filme", + "alle-filme": "Alle Filme", + "action": "Action", + "animation": "Animation", + "dokumentarfilm": "Dokumentation", + "drama": "Drama", + "fantasy": "Fantasy", + "horror": "Horror", + "komodie": "Komödie", + "krimi-thriller": "Krimi & Thriller", + "romantik": "Romantik", + "sci-fi": "Science-Fiction", +} + +ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] + + +# --------------------------------------------------------------------------- +# Plugin-Klasse +# --------------------------------------------------------------------------- + +class NetzkinoPlugin(BasisPlugin): + """NetzkKino Integration für ViewIT. + + Alle Titel sind Filme (keine Serien). Streams sind direkte MP4-URLs. + """ + + name = "NetzkKino" + + def __init__(self) -> None: + # title → direkte MP4-URL + self._title_to_stream: dict[str, str] = {} + # title → (plot, poster, fanart) + self._title_meta: dict[str, tuple[str, str, str]] = {} + + # ------------------------------------------------------------------ + # Verfügbarkeit + # ------------------------------------------------------------------ + + @property + def is_available(self) -> bool: + return REQUESTS_AVAILABLE + + @property + def unavailable_reason(self) -> str: + if REQUESTS_AVAILABLE: + return "" + return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}" + + # ------------------------------------------------------------------ + # HTTP + # ------------------------------------------------------------------ + + def _get_session(self): # type: ignore[return] + from http_session_pool import get_requests_session + return get_requests_session("netzkino", headers=HEADERS) + + def _get_json(self, url: str) -> dict | list | None: + session = self._get_session() + response = None + try: + response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) + response.raise_for_status() + return response.json() + except Exception: + return None + finally: + if response is not None: + try: + response.close() + except Exception: + pass + + # ------------------------------------------------------------------ + # Interne Hilfsmethoden + # ------------------------------------------------------------------ + + def _build_stream_url(self, streaming_id: str) -> str: + return f"{_STREAM_BASE}/{streaming_id}.mp4" + + def _cache_post(self, post: dict) -> str: + """Cached einen API-Post und gibt den Titel zurück ('' = überspringen).""" + title = str(post.get("title") or "").strip() + if not title: + return "" + + # Stream-URL aus custom_fields.Streaming[0] + custom = post.get("custom_fields") or {} + streaming_ids = custom.get("Streaming") or [] + if not streaming_ids or not streaming_ids[0]: + return "" + + stream_url = self._build_stream_url(str(streaming_ids[0])) + self._title_to_stream[title] = stream_url + + # Metadaten + plot = str(post.get("content") or "").strip() + # Poster: thumbnail + poster = str(post.get("thumbnail") or "").strip() + # Fanart: featured_img_all[0] + fanart_list = custom.get("featured_img_all") or [] + fanart = str(fanart_list[0]).strip() if fanart_list and fanart_list[0] else "" + + self._title_meta[title] = (plot, poster, fanart) + return title + + def _load_posts(self, url: str) -> List[str]: + data = self._get_json(url) + if not isinstance(data, dict): + return [] + titles: list[str] = [] + for post in (data.get("posts") or []): + if not isinstance(post, dict): + continue + t = self._cache_post(post) + if t: + titles.append(t) + return titles + + # ------------------------------------------------------------------ + # Pflicht-Methoden + # ------------------------------------------------------------------ + + async def search_titles( + self, query: str, progress_callback: ProgressCallback = None + ) -> List[str]: + query = (query or "").strip() + if not query or not REQUESTS_AVAILABLE: + return [] + from urllib.parse import quote_plus + url = _URL_SEARCH.format(query=quote_plus(query)) + return self._load_posts(url) + + def seasons_for(self, title: str) -> List[str]: + # NetzkKino hat ausschließlich Filme + return ["Film"] + + def episodes_for(self, title: str, season: str) -> List[str]: + title = (title or "").strip() + if not title: + return [] + # Nur eine Episode: der Film selbst + return [title] + + # ------------------------------------------------------------------ + # Stream + # ------------------------------------------------------------------ + + def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: + title = (title or "").strip() + return self._title_to_stream.get(title) + + def resolve_stream_link(self, link: str) -> Optional[str]: + # Direkte MP4-URL – keine Auflösung nötig + link = (link or "").strip() + return link if link else None + + # ------------------------------------------------------------------ + # Metadaten + # ------------------------------------------------------------------ + + def metadata_for( + self, title: str + ) -> tuple[dict[str, str], dict[str, str], list | None]: + title = (title or "").strip() + if not title: + return {}, {}, None + + info: dict[str, str] = {"title": title} + art: dict[str, str] = {} + + cached = self._title_meta.get(title) + if cached: + plot, poster, fanart = cached + if plot: + info["plot"] = plot + if poster: + art["thumb"] = poster + art["poster"] = poster + if fanart: + art["fanart"] = fanart + art["landscape"] = fanart + + return info, art, None + + # ------------------------------------------------------------------ + # Browsing + # ------------------------------------------------------------------ + + def latest_titles(self, page: int = 1) -> List[str]: + url = _URL_CATEGORY.format(slug="neue-filme") + return self._load_posts(url) + + def genres(self) -> List[str]: + # Gibt die Anzeigenamen zurück (sortiert, Browsing-Kategorien) + return sorted(CATEGORIES.values()) + + def titles_for_genre(self, genre: str) -> List[str]: + # Slug aus Anzeigename rückauflösen + slug = next((s for s, n in CATEGORIES.items() if n == genre), "") + if not slug: + return [] + url = _URL_CATEGORY.format(slug=slug) + return self._load_posts(url) + + def capabilities(self) -> set[str]: + return {"latest_titles", "genres"} diff --git a/addon/plugins/serienstream_plugin.py b/addon/plugins/serienstream_plugin.py index f45aa16..8cb6514 100644 --- a/addon/plugins/serienstream_plugin.py +++ b/addon/plugins/serienstream_plugin.py @@ -1138,6 +1138,8 @@ class SerienstreamPlugin(BasisPlugin): self._genre_page_entries_cache: dict[tuple[str, int], list[SeriesResult]] = {} self._genre_page_has_more_cache: dict[tuple[str, int], bool] = {} self._popular_cache: Optional[list[SeriesResult]] = None + self._alpha_letters_cache: Optional[list[str]] = None + self._alpha_page_count_cache: dict[str, int] = {} self._requests_available = REQUESTS_AVAILABLE self._default_preferred_hosters: list[str] = list(DEFAULT_PREFERRED_HOSTERS) self._preferred_hosters: list[str] = list(self._default_preferred_hosters) @@ -1370,7 +1372,7 @@ class SerienstreamPlugin(BasisPlugin): def capabilities(self) -> set[str]: """Meldet unterstützte Features für Router-Menüs.""" - return {"popular_series", "genres", "latest_episodes"} + return {"popular_series", "genres", "latest_episodes", "alpha"} def popular_series(self) -> list[str]: """Liefert die Titel der beliebten Serien (Quelle: `/beliebte-serien`).""" @@ -1396,6 +1398,70 @@ class SerienstreamPlugin(BasisPlugin): self._remember_series_result(entry.title, entry.url, entry.description) return [entry.title for entry in entries if entry.title] + def alpha_index(self) -> list[str]: + """Liefert alle Buchstaben aus dem A-Z-Katalog (/serien).""" + if not self._requests_available: + return [] + if self._alpha_letters_cache is not None: + return list(self._alpha_letters_cache) + try: + soup = _get_soup(_get_base_url() +"/serien") + except Exception: + return [] + letters = [] + for a in soup.select("nav.alphabet-bar a.alphabet-link[href]"): + letter = a.get_text(strip=True) + if letter: + letters.append(letter) + self._alpha_letters_cache = letters + return list(letters) + + def alpha_page_count(self, letter: str) -> int: + """Gibt die Anzahl der Seiten für einen Buchstaben zurück.""" + letter = (letter or "").strip() + if not letter: + return 1 + if letter in self._alpha_page_count_cache: + return self._alpha_page_count_cache[letter] + try: + soup = _get_soup(_get_base_url() +f"/katalog/{letter}") + except Exception: + return 1 + page_nums = [] + for a in soup.select(".pagination a[href]"): + m = re.search(r"page=(\d+)", a.get("href", "")) + if m: + page_nums.append(int(m.group(1))) + count = max(page_nums) if page_nums else 1 + self._alpha_page_count_cache[letter] = count + return count + + def titles_for_alpha_page(self, letter: str, page: int = 1) -> list[str]: + """Liefert Serientitel für den angegebenen Buchstaben und Seitenindex.""" + letter = (letter or "").strip() + if not letter or not self._requests_available: + return [] + page = max(1, int(page or 1)) + url = _get_base_url() +f"/katalog/{letter}" + if page > 1: + url = url + f"?page={page}" + try: + soup = _get_soup(url) + except Exception: + return [] + seen: set[str] = set() + titles: list[str] = [] + for a in soup.select("a[href*='/serie/']"): + href = (a.get("href") or "").strip() + title = a.get_text(strip=True) + if not href or not title or href in seen: + continue + seen.add(href) + full_url = href if href.startswith("http") else _get_base_url() +href + self._remember_series_result(title, full_url, "") + titles.append(title) + return titles + @staticmethod def _title_group_key(title: str) -> str: raw = (title or "").strip() diff --git a/tests/test_moflix_plugin.py b/tests/test_moflix_plugin.py new file mode 100644 index 0000000..2c03ad6 --- /dev/null +++ b/tests/test_moflix_plugin.py @@ -0,0 +1,711 @@ +"""Tests für das Moflix-Stream-Plugin. + +Mockt _get_json() auf Instance-Ebene um reale HTTP-Requests zu vermeiden. +Testet u.a. den Cross-Invocation-Cache-Miss-Bug (leere Instanz ohne Vorsuche). +""" + +import asyncio + +from addon.plugins.moflix_plugin import MoflixPlugin, GENRE_SLUGS, COLLECTION_SLUGS, _unpack_packer + +# --------------------------------------------------------------------------- +# JSON-Fixtures (realistische Moflix-API-Antworten) +# --------------------------------------------------------------------------- + +SEARCH_RESPONSE = { + "results": [ + { + "id": "123", + "name": "Breaking Bad", + "is_series": True, + "description": "Chemie-Lehrer wird Drogenboss.", + "poster": "https://cdn.example.com/bb.jpg", + "backdrop": "https://cdn.example.com/bb-bg.jpg", + "model_type": "title", + }, + { + "id": "456", + "name": "Inception", + "is_series": False, + "description": "Ein Traum im Traum.", + "poster": "https://cdn.example.com/inc.jpg", + "backdrop": "https://cdn.example.com/inc-bg.jpg", + "model_type": "title", + }, + # Personen-Eintrag – soll übersprungen werden + {"id": "789", "name": "Christopher Nolan", "model_type": "person"}, + ] +} + +TITLE_RESPONSE_SERIES = { + "title": { + "id": "123", + "name": "Breaking Bad", + "description": "Chemie-Lehrer wird Drogenboss.", + "poster": "https://cdn.example.com/bb.jpg", + "backdrop": "https://cdn.example.com/bb-bg.jpg", + "rating": 9.5, + "release_date": "2008-01-20", + }, + "seasons": { + "data": [ + {"number": 2, "title_id": "1002"}, # absichtlich unsortiert + {"number": 1, "title_id": "1001"}, + ] + }, +} + +TITLE_RESPONSE_MOVIE = { + "title": { + "id": "456", + "name": "Inception", + "description": "Ein Traum im Traum.", + "poster": "https://cdn.example.com/inc.jpg", + "backdrop": "https://cdn.example.com/inc-bg.jpg", + "rating": 8.8, + "release_date": "2010-07-15", + "videos": [ + # gupload.xyz wird übersprungen (_VIDEO_SKIP_DOMAINS) + {"quality": "1080p", "src": "https://gupload.xyz/data/e/deadbeef", "name": "Mirror 1"}, + # vidara.to wird bevorzugt + {"quality": "1080p", "src": "https://vidara.to/e/inc7testXYZ", "name": "Mirror 2"}, + ], + }, + "seasons": {"data": []}, +} + +EPISODES_RESPONSE = { + "pagination": { + "data": [ + {"episode_number": 1, "name": "Pilot", "primary_video": {"id": 1}}, + {"episode_number": 2, "name": "Cat's in the Bag", "primary_video": {"id": 2}}, + # primary_video=None → überspringen + {"episode_number": 3, "name": "Kein Video", "primary_video": None}, + ] + } +} + +# Episoden-Detail-Response (für stream_link_for, enthält videos[] mit src-URLs) +EPISODE_DETAIL_RESPONSE = { + "episode": { + "videos": [ + # gupload.xyz wird übersprungen + {"quality": "1080p", "src": "https://gupload.xyz/data/e/ep1hash", "name": "Mirror 1"}, + # vidara.to wird bevorzugt → dieser src wird zurückgegeben + {"quality": "1080p", "src": "https://vidara.to/e/ep1vidara", "name": "Mirror 2"}, + # YouTube → immer überspringen + {"quality": None, "src": "https://youtube.com/watch?v=abc", "name": "Trailer"}, + ] + } +} + +VIDARA_STREAM_RESPONSE = { + "filecode": "ep1vidara", + "streaming_url": "https://cdn.example.com/hls/ep1/master.m3u8", + "subtitles": None, + "thumbnail": "https://cdn.example.com/thumb.jpg", + "title": "", +} + +# Minimales HTML mit p.a.c.k.e.r.-obfuskiertem JS (VidHide-Format). +# Packed-String kodiert: +# var links={"hls2":"https://cdn.example.com/hls/test/master.m3u8"}; +# jwplayer("vplayer").setup({sources:[{file:links.hls2,type:"hls"}]}); +# mit base=36 und keywords: var|links|hls2|jwplayer|vplayer|setup|sources|file|type +VIDHIDE_HTML = ( + "" +) + +CHANNEL_RESPONSE = { + "channel": { + "content": { + "data": [ + { + "id": "100", + "name": "Squid Game", + "is_series": True, + "description": "Spiele.", + "poster": "https://cdn.example.com/sq.jpg", + "backdrop": "", + }, + { + "id": "200", + "name": "The Crown", + "is_series": True, + "description": "", + "poster": "", + "backdrop": "", + }, + ] + } + } +} + + +# --------------------------------------------------------------------------- +# Hilfsfunktion: URL-basiertes Mock-Routing +# --------------------------------------------------------------------------- + +def make_json_router(**routes): + """Erzeugt eine _get_json-Mock, die URL-abhängig antwortet. + + Schlüssel = Substring der URL, Wert = zurückzugebende JSON-Daten. + Reihenfolge: spezifischere Schlüssel zuerst übergeben (dict-Reihenfolge). + """ + def _router(url, headers=None): + for key, response in routes.items(): + if key in url: + return response + return None + return _router + + +# --------------------------------------------------------------------------- +# Tests: search_titles +# --------------------------------------------------------------------------- + +def test_search_titles_returns_names(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE) + titles = asyncio.run(plugin.search_titles("breaking")) + assert "Breaking Bad" in titles + assert "Inception" in titles + # Person-Eintrag darf nicht auftauchen + assert "Christopher Nolan" not in titles + + +def test_search_populates_cache(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE) + asyncio.run(plugin.search_titles("breaking")) + # URL-Cache + assert "Breaking Bad" in plugin._title_to_url + assert "/api/v1/titles/123" in plugin._title_to_url["Breaking Bad"] + # is_series-Cache + assert plugin._is_series["Breaking Bad"] is True + assert plugin._is_series["Inception"] is False + # Metadaten-Cache + assert plugin._title_meta["Breaking Bad"][0] == "Chemie-Lehrer wird Drogenboss." + assert plugin._title_meta["Inception"][1] == "https://cdn.example.com/inc.jpg" + + +def test_search_empty_query_returns_empty(): + plugin = MoflixPlugin() + titles = asyncio.run(plugin.search_titles("")) + assert titles == [] + + +# --------------------------------------------------------------------------- +# Tests: seasons_for +# --------------------------------------------------------------------------- + +def test_seasons_for_series_after_search(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", make_json_router( + search=SEARCH_RESPONSE, + titles=TITLE_RESPONSE_SERIES, + )) + asyncio.run(plugin.search_titles("breaking")) + seasons = plugin.seasons_for("Breaking Bad") + # Staffeln korrekt sortiert + assert seasons == ["Staffel 1", "Staffel 2"] + + +def test_seasons_for_film_returns_film(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE) + asyncio.run(plugin.search_titles("inception")) + seasons = plugin.seasons_for("Inception") + assert seasons == ["Film"] + + +def test_seasons_for_caches_season_api_ids(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", make_json_router( + search=SEARCH_RESPONSE, + titles=TITLE_RESPONSE_SERIES, + )) + asyncio.run(plugin.search_titles("breaking")) + plugin.seasons_for("Breaking Bad") + assert plugin._season_api_ids[("Breaking Bad", 1)] == "1001" + assert plugin._season_api_ids[("Breaking Bad", 2)] == "1002" + + +def test_seasons_for_cache_miss_triggers_resolve(monkeypatch): + """Bug-Regression: seasons_for() ohne Vorsuche (leere Instanz = Kodi-Neuaufruf). + + _resolve_title() muss automatisch eine Suche starten und den Cache befüllen. + """ + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", make_json_router( + search=SEARCH_RESPONSE, + titles=TITLE_RESPONSE_SERIES, + )) + # KEIN asyncio.run(search_titles(...)) – simuliert leere Instanz + seasons = plugin.seasons_for("Breaking Bad") + assert seasons == ["Staffel 1", "Staffel 2"] + + +def test_seasons_for_unknown_title_returns_empty(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: {"results": []}) + seasons = plugin.seasons_for("Unbekannter Titel XYZ") + assert seasons == [] + + +# --------------------------------------------------------------------------- +# Tests: episodes_for +# --------------------------------------------------------------------------- + +def test_episodes_for_series(monkeypatch): + plugin = MoflixPlugin() + # "/titles/123" matcht nur die Titel-Detail-URL (id=123), nicht die Episoden-URL (id=1001) + monkeypatch.setattr(plugin, "_get_json", make_json_router( + **{"search": SEARCH_RESPONSE, "/titles/123": TITLE_RESPONSE_SERIES, "episodes": EPISODES_RESPONSE} + )) + asyncio.run(plugin.search_titles("breaking")) + plugin.seasons_for("Breaking Bad") + episodes = plugin.episodes_for("Breaking Bad", "Staffel 1") + assert episodes == ["Episode 1 – Pilot", "Episode 2 – Cat's in the Bag"] + # Episode ohne primary_video (Nr. 3) darf nicht enthalten sein + assert len(episodes) == 2 + + +def test_episodes_for_film_returns_title(): + plugin = MoflixPlugin() + result = plugin.episodes_for("Inception", "Film") + assert result == ["Inception"] + + +def test_episodes_cache_hit(monkeypatch): + """Zweiter episodes_for()-Aufruf darf keine neuen _get_json-Calls auslösen.""" + call_count = {"n": 0} + def counting_router(url, headers=None): + call_count["n"] += 1 + return make_json_router( + search=SEARCH_RESPONSE, + titles=TITLE_RESPONSE_SERIES, + episodes=EPISODES_RESPONSE, + )(url) + + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", counting_router) + asyncio.run(plugin.search_titles("breaking")) + plugin.seasons_for("Breaking Bad") + plugin.episodes_for("Breaking Bad", "Staffel 1") + calls_after_first = call_count["n"] + + # Zweiter Aufruf – kein neuer HTTP-Call + plugin.episodes_for("Breaking Bad", "Staffel 1") + assert call_count["n"] == calls_after_first + + +# --------------------------------------------------------------------------- +# Tests: stream_link_for +# --------------------------------------------------------------------------- + +def test_stream_link_for_episode_returns_vidara_src(monkeypatch): + """stream_link_for() für Episode gibt vidara.to-URL aus episode.videos[] zurück.""" + plugin = MoflixPlugin() + # Reihenfolge: spezifischere Keys zuerst + # "episodes/1" matcht die Detail-URL .../episodes/1?... + # "episodes" matcht die Listen-URL .../episodes?... + monkeypatch.setattr(plugin, "_get_json", make_json_router( + **{ + "search": SEARCH_RESPONSE, + "/titles/123": TITLE_RESPONSE_SERIES, + "episodes/1": EPISODE_DETAIL_RESPONSE, + "episodes": EPISODES_RESPONSE, + } + )) + asyncio.run(plugin.search_titles("breaking")) + plugin.seasons_for("Breaking Bad") + plugin.episodes_for("Breaking Bad", "Staffel 1") + link = plugin.stream_link_for("Breaking Bad", "Staffel 1", "Episode 1 – Pilot") + # gupload.xyz wird übersprungen, vidara.to bevorzugt + assert link == "https://vidara.to/e/ep1vidara" + + +def test_stream_link_for_episode_cache_miss(monkeypatch): + """stream_link_for() funktioniert auch ohne Vorsuche (leere Instanz).""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", make_json_router( + **{ + "search": SEARCH_RESPONSE, + "/titles/123": TITLE_RESPONSE_SERIES, + "episodes/1": EPISODE_DETAIL_RESPONSE, + "episodes": EPISODES_RESPONSE, + } + )) + link = plugin.stream_link_for("Breaking Bad", "Staffel 1", "Episode 1 – Pilot") + assert link == "https://vidara.to/e/ep1vidara" + + +def test_stream_link_for_movie(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", make_json_router( + search=SEARCH_RESPONSE, + titles=TITLE_RESPONSE_MOVIE, + )) + asyncio.run(plugin.search_titles("inception")) + link = plugin.stream_link_for("Inception", "Film", "Inception") + # gupload.xyz übersprungen, vidara.to bevorzugt + assert link == "https://vidara.to/e/inc7testXYZ" + + +def test_stream_link_for_movie_cache_miss(monkeypatch): + """Film-Stream auch ohne Vorsuche (leere Instanz via _resolve_title).""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", make_json_router( + search=SEARCH_RESPONSE, + titles=TITLE_RESPONSE_MOVIE, + )) + link = plugin.stream_link_for("Inception", "Film", "Inception") + assert link == "https://vidara.to/e/inc7testXYZ" + + +# --------------------------------------------------------------------------- +# Tests: _best_src_from_videos +# --------------------------------------------------------------------------- + +def test_best_src_prefers_vidara_over_fallback(): + plugin = MoflixPlugin() + videos = [ + {"src": "https://moflix-stream.link/e/abc", "quality": "1080p"}, + {"src": "https://vidara.to/e/xyz789", "quality": "1080p"}, + ] + assert plugin._best_src_from_videos(videos) == "https://vidara.to/e/xyz789" + + +def test_best_src_skips_gupload(): + plugin = MoflixPlugin() + videos = [ + {"src": "https://gupload.xyz/data/e/hash", "quality": "1080p"}, + {"src": "https://moflix-stream.link/e/abc", "quality": "1080p"}, + ] + # gupload übersprungen, moflix-stream.link als Fallback + assert plugin._best_src_from_videos(videos) == "https://moflix-stream.link/e/abc" + + +def test_best_src_skips_youtube(): + plugin = MoflixPlugin() + videos = [ + {"src": "https://youtube.com/watch?v=xyz", "quality": None}, + {"src": "https://vidara.to/e/real123", "quality": "1080p"}, + ] + assert plugin._best_src_from_videos(videos) == "https://vidara.to/e/real123" + + +def test_best_src_all_skipped_returns_none(): + plugin = MoflixPlugin() + videos = [ + {"src": "https://gupload.xyz/data/e/hash"}, + {"src": "https://youtube.com/watch?v=xyz"}, + ] + assert plugin._best_src_from_videos(videos) is None + + +def test_best_src_empty_returns_none(): + plugin = MoflixPlugin() + assert plugin._best_src_from_videos([]) is None + assert plugin._best_src_from_videos(None) is None # type: ignore[arg-type] + + +# --------------------------------------------------------------------------- +# Tests: resolve_stream_link / _resolve_vidara +# --------------------------------------------------------------------------- + +def test_resolve_stream_link_vidara_returns_hls(monkeypatch): + """resolve_stream_link() ruft vidara.to-API auf und gibt streaming_url zurück.""" + plugin = MoflixPlugin() + + def mock_get_json(url, headers=None): + if "vidara.to" in url: + return VIDARA_STREAM_RESPONSE + return None + + monkeypatch.setattr(plugin, "_get_json", mock_get_json) + result = plugin.resolve_stream_link("https://vidara.to/e/ep1vidara") + assert result == "https://cdn.example.com/hls/ep1/master.m3u8" + + +def test_resolve_stream_link_vidara_api_fails_returns_none(monkeypatch): + """Wenn vidara-API None zurückgibt und ResolveURL nicht klappt → None.""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: None) + result = plugin.resolve_stream_link("https://vidara.to/e/broken123") + # Weder vidara-API noch ResolveURL → None (kein unauflösbarer Link) + assert result is None + + +def test_resolve_stream_link_non_vidhide_tries_resolveurl(monkeypatch): + """Für sonstige URLs wird ResolveURL aufgerufen; ohne Installation → None.""" + plugin = MoflixPlugin() + result = plugin.resolve_stream_link("https://moflix-stream.link/e/somefilm") + # Ohne ResolveURL-Installation → None + assert result is None + + +# --------------------------------------------------------------------------- +# Tests: Channel-Browse (popular, genre, collection) +# --------------------------------------------------------------------------- + +def test_popular_series_returns_titles(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: CHANNEL_RESPONSE) + titles = plugin.popular_series() + assert titles == ["Squid Game", "The Crown"] + # Cache muss befüllt sein + assert "Squid Game" in plugin._title_to_url + + +def test_channel_empty_response_returns_empty(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: None) + assert plugin.popular_series() == [] + assert plugin.latest_titles() == [] + + +def test_channel_malformed_response_returns_empty(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: {"channel": {}}) + assert plugin.popular_series() == [] + + +def test_titles_for_genre(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: CHANNEL_RESPONSE) + titles = plugin.titles_for_genre("Action") + assert "Squid Game" in titles + + +def test_titles_for_unknown_genre_returns_empty(): + plugin = MoflixPlugin() + assert plugin.titles_for_genre("Unbekanntes Genre XYZ") == [] + + +def test_titles_for_collection(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: CHANNEL_RESPONSE) + titles = plugin.titles_for_collection("James Bond Collection") + assert "Squid Game" in titles + + +# --------------------------------------------------------------------------- +# Tests: genres / collections / capabilities +# --------------------------------------------------------------------------- + +def test_genres_returns_sorted_list(): + plugin = MoflixPlugin() + genres = plugin.genres() + assert genres == sorted(GENRE_SLUGS.keys()) + assert "Action" in genres + assert "Horror" in genres + + +def test_collections_returns_sorted_list(): + plugin = MoflixPlugin() + colls = plugin.collections() + assert colls == sorted(COLLECTION_SLUGS.keys()) + assert "James Bond Collection" in colls + + +def test_capabilities(): + plugin = MoflixPlugin() + caps = plugin.capabilities() + assert "popular_series" in caps + assert "latest_titles" in caps + assert "genres" in caps + assert "collections" in caps + + +# --------------------------------------------------------------------------- +# Tests: metadata_for +# --------------------------------------------------------------------------- + +def test_metadata_from_cache(monkeypatch): + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE) + asyncio.run(plugin.search_titles("breaking")) + + # Metadaten-Abruf darf jetzt keinen neuen HTTP-Call auslösen + call_count = {"n": 0} + def no_call(url, headers=None): + call_count["n"] += 1 + return None + monkeypatch.setattr(plugin, "_get_json", no_call) + + info, art, _ = plugin.metadata_for("Breaking Bad") + assert info.get("plot") == "Chemie-Lehrer wird Drogenboss." + assert art.get("poster") == "https://cdn.example.com/bb.jpg" + assert call_count["n"] == 0 # kein HTTP-Call + + +def test_metadata_api_fallback(monkeypatch): + """Metadaten werden via API geladen wenn nicht im Cache.""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_json", make_json_router( + search=SEARCH_RESPONSE, + titles=TITLE_RESPONSE_SERIES, + )) + asyncio.run(plugin.search_titles("breaking")) + # Cache leeren um API-Fallback zu erzwingen + plugin._title_meta.clear() + + info, art, _ = plugin.metadata_for("Breaking Bad") + assert info.get("plot") == "Chemie-Lehrer wird Drogenboss." + assert "year" in info + assert info["year"] == "2008" + + +def test_metadata_unknown_title_returns_empty(): + plugin = MoflixPlugin() + info, art, streams = plugin.metadata_for("Unbekannt") + assert info == {"title": "Unbekannt"} + assert art == {} + assert streams is None + + +# --------------------------------------------------------------------------- +# Tests: _unpack_packer +# --------------------------------------------------------------------------- + +def test_unpack_packer_basic(): + """_unpack_packer() entpackt ein p.a.c.k.e.r.-Fragment korrekt.""" + packed = ( + "eval(function(p,a,c,k,e,d){return p}" + "('0 1={\"2\":\"https://cdn.example.com/hls/test/master.m3u8\"};'," + "36,3,'var|links|hls2'.split('|'),0,0))" + ) + result = _unpack_packer(packed) + assert 'var links={"hls2":"https://cdn.example.com/hls/test/master.m3u8"}' in result + + +def test_unpack_packer_preserves_url(): + """URLs in String-Literalen werden durch den Unpacker nicht korrumpiert.""" + packed = ( + "eval(function(p,a,c,k,e,d){return p}" + "('0 1={\"2\":\"https://cdn.example.com/hls/test/master.m3u8\"};'," + "36,3,'var|links|hls2'.split('|'),0,0))" + ) + result = _unpack_packer(packed) + assert "https://cdn.example.com/hls/test/master.m3u8" in result + + +def test_unpack_packer_no_match_returns_input(): + """Wenn kein p.a.c.k.e.r.-Muster gefunden wird, wird der Input unverändert zurückgegeben.""" + raw = "var x = 1; console.log(x);" + assert _unpack_packer(raw) == raw + + +def test_unpack_packer_full_vidhide_fixture(): + """Entpackt die VIDHIDE_HTML-Fixture und findet hls2-URL.""" + result = _unpack_packer(VIDHIDE_HTML) + assert '"hls2":"https://cdn.example.com/hls/test/master.m3u8"' in result + assert "jwplayer" in result + assert "links.hls2" in result + + +# --------------------------------------------------------------------------- +# Tests: _resolve_vidhide / resolve_stream_link (VidHide) +# --------------------------------------------------------------------------- + +def test_resolve_vidhide_extracts_hls_url(monkeypatch): + """_resolve_vidhide() gibt den hls2-Stream-Link mit Kodi-Header-Suffix zurück.""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: VIDHIDE_HTML) + result = plugin._resolve_vidhide("https://moflix-stream.click/embed/kqocffe8ipcf") + assert result is not None + assert result.startswith("https://cdn.example.com/hls/test/master.m3u8|") + assert "Referer=" in result + assert "User-Agent=" in result + + +def test_resolve_vidhide_no_packer_returns_none(monkeypatch): + """_resolve_vidhide() gibt None zurück wenn kein p.a.c.k.e.r. in der Seite.""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: "no packer here") + result = plugin._resolve_vidhide("https://moflix-stream.click/embed/abc") + assert result is None + + +def test_resolve_vidhide_html_fetch_fails_returns_none(monkeypatch): + """_resolve_vidhide() gibt None zurück wenn _get_html() fehlschlägt.""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: None) + result = plugin._resolve_vidhide("https://moflix-stream.click/embed/abc") + assert result is None + + +def test_resolve_stream_link_vidhide_returns_hls(monkeypatch): + """resolve_stream_link() ruft _resolve_vidhide() auf und gibt HLS-URL mit Header-Suffix zurück.""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: VIDHIDE_HTML) + result = plugin.resolve_stream_link("https://moflix-stream.click/embed/kqocffe8ipcf") + assert result is not None + assert result.startswith("https://cdn.example.com/hls/test/master.m3u8|") + assert "Referer=" in result + assert "User-Agent=" in result + + +def test_resolve_stream_link_vidhide_fallback_on_failure(monkeypatch): + """Wenn VidHide-Resolver fehlschlägt, wird None zurückgegeben (kein unauflösbarer Link).""" + plugin = MoflixPlugin() + monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: None) + result = plugin.resolve_stream_link("https://moflix-stream.click/embed/broken") + # Kein VidHide-Ergebnis → None (Kodi zeigt "Kein Stream"-Dialog) + assert result is None + + +# --------------------------------------------------------------------------- +# Tests: _best_src_from_videos – moflix-stream.click nicht mehr übersprungen +# --------------------------------------------------------------------------- + +def test_best_src_vidhide_not_skipped(): + """moflix-stream.click ist nicht mehr in _VIDEO_SKIP_DOMAINS.""" + plugin = MoflixPlugin() + videos = [ + {"src": "https://moflix-stream.click/embed/abc123", "quality": "1080p"}, + ] + result = plugin._best_src_from_videos(videos) + assert result == "https://moflix-stream.click/embed/abc123" + + +def test_best_src_vidara_preferred_over_vidhide(): + """vidara.to hat Vorrang vor moflix-stream.click.""" + plugin = MoflixPlugin() + videos = [ + {"src": "https://moflix-stream.click/embed/abc123", "quality": "1080p"}, + {"src": "https://vidara.to/e/xyz789", "quality": "1080p"}, + ] + result = plugin._best_src_from_videos(videos) + assert result == "https://vidara.to/e/xyz789" + + +def test_stream_link_for_movie_vidhide_only(monkeypatch): + """Film mit nur moflix-stream.click Mirror: stream_link_for() gibt VidHide-src zurück.""" + plugin = MoflixPlugin() + plugin._title_to_url["The Bluff"] = "https://moflix-stream.xyz/api/v1/titles/789?load=videos" + plugin._is_series["The Bluff"] = False + + def mock_get_json(_url, _headers=None): + return { + "title": { + "videos": [ + {"quality": "1080p", "src": "https://moflix-stream.click/embed/kqocffe8ipcf", "name": "Mirror 1"}, + ], + }, + } + + monkeypatch.setattr(plugin, "_get_json", mock_get_json) + link = plugin.stream_link_for("The Bluff", "Film", "The Bluff") + assert link == "https://moflix-stream.click/embed/kqocffe8ipcf"