"""HTML-basierte Integration fuer eine Streaming-/Mediathek-Seite (Template). Dieses Plugin ist als Startpunkt gedacht, um eine eigene/autorisiert betriebene Seite mit einer HTML-Suche in ViewIt einzubinden. Hinweise: - Nutzt optional `requests` + `beautifulsoup4` (bs4). - `search_titles` liefert eine Trefferliste (Titel-Strings). - `seasons_for` / `episodes_for` können für Filme als Single-Season/Single-Episode modelliert werden (z.B. Staffel 1, Episode 1) oder komplett leer bleiben, solange nur Serien unterstützt werden. """ from __future__ import annotations from dataclasses import dataclass from datetime import datetime import hashlib import os import re import json from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional from urllib.parse import urlencode, urljoin try: # pragma: no cover - optional dependency import requests from bs4 import BeautifulSoup # type: ignore[import-not-found] except ImportError as exc: # pragma: no cover - optional dependency requests = None BeautifulSoup = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None try: # pragma: no cover - optional Kodi helpers import xbmcaddon # type: ignore[import-not-found] import xbmcvfs # type: ignore[import-not-found] import xbmcgui # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow running outside Kodi xbmcaddon = None xbmcvfs = None xbmcgui = None from plugin_interface import BasisPlugin from plugin_helpers import dump_response_html, get_setting_bool, log_error, log_url, notify_url from regex_patterns import DIGITS from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text if TYPE_CHECKING: # pragma: no cover from requests import Session as RequestsSession from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found] else: # pragma: no cover RequestsSession = Any BeautifulSoupT = Any ADDON_ID = "plugin.video.viewit" SETTING_BASE_URL = "topstream_base_url" DEFAULT_BASE_URL = "https://topstreamfilm.live" GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info" GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors" SETTING_LOG_URLS = "log_urls_topstreamfilm" SETTING_DUMP_HTML = "dump_html_topstreamfilm" SETTING_SHOW_URL_INFO = "show_url_info_topstreamfilm" SETTING_LOG_ERRORS = "log_errors_topstreamfilm" DEFAULT_TIMEOUT = 20 DEFAULT_PREFERRED_HOSTERS = ["supervideo", "dropload", "voe"] MEINECLOUD_HOST = "meinecloud.click" HEADERS = { "User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Connection": "keep-alive", } ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] def _emit_progress(callback: ProgressCallback, message: str, percent: Optional[int] = None) -> None: if not callable(callback): return try: callback(str(message or ""), None if percent is None else int(percent)) except Exception: return @dataclass(frozen=True) class SearchHit: """Interner Treffer mit Title + URL.""" title: str url: str description: str = "" poster: str = "" def _normalize_search_text(value: str) -> str: return _shared_normalize_search_text(value) def _matches_query(query: str, *, title: str, description: str) -> bool: _ = description return _shared_matches_query(query, title=title) def _strip_der_film_suffix(title: str) -> str: """Entfernt den Suffix 'Der Film' am Ende, z.B. 'Star Trek – Der Film'.""" title = (title or "").strip() if not title: return "" title = re.sub(r"\s*[-–]\s*der\s+film\s*$", "", title, flags=re.IGNORECASE).strip() return title class TopstreamfilmPlugin(BasisPlugin): """Integration fuer eine HTML-basierte Suchseite.""" name = "Topstreamfilm" version = "1.0.0" def __init__(self) -> None: self._session: RequestsSession | None = None self._title_to_url: Dict[str, str] = {} self._genre_to_url: Dict[str, str] = {} self._movie_iframe_url: Dict[str, str] = {} self._movie_title_hint: set[str] = set() self._genre_last_page: Dict[str, int] = {} self._season_cache: Dict[str, List[str]] = {} self._episode_cache: Dict[tuple[str, str], List[str]] = {} self._episode_to_url: Dict[tuple[str, str, str], str] = {} self._episode_to_hosters: Dict[tuple[str, str, str], Dict[str, str]] = {} self._season_to_episode_numbers: Dict[tuple[str, str], List[int]] = {} self._episode_title_by_number: Dict[tuple[str, int, int], str] = {} self._detail_html_cache: Dict[str, str] = {} self._title_meta: Dict[str, tuple[str, str]] = {} self._popular_cache: List[str] | None = None self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._preferred_hosters: List[str] = list(self._default_preferred_hosters) self.is_available = REQUESTS_AVAILABLE self.unavailable_reason = None if REQUESTS_AVAILABLE else f"requests/bs4 fehlen: {REQUESTS_IMPORT_ERROR}" self._load_title_url_cache() self._load_genre_cache() def _cache_dir(self) -> str: if xbmcaddon and xbmcvfs: try: addon = xbmcaddon.Addon(ADDON_ID) profile = xbmcvfs.translatePath(addon.getAddonInfo("profile")) if not xbmcvfs.exists(profile): xbmcvfs.mkdirs(profile) return profile except Exception: pass return os.path.dirname(__file__) def _title_url_cache_path(self) -> str: return os.path.join(self._cache_dir(), "topstream_title_url_cache.json") def _load_title_url_cache(self) -> None: path = self._title_url_cache_path() try: if xbmcvfs and xbmcvfs.exists(path): handle = xbmcvfs.File(path) raw = handle.read() handle.close() elif os.path.exists(path): with open(path, "r", encoding="utf-8") as handle: raw = handle.read() else: return loaded = json.loads(raw or "{}") if isinstance(loaded, dict): # New format: {base_url: {title: url}} base_url = self._get_base_url() if base_url in loaded and isinstance(loaded.get(base_url), dict): loaded = loaded.get(base_url) or {} # Backwards compatible: {title: url} for title, url in (loaded or {}).items(): if isinstance(title, str) and isinstance(url, str) and title.strip() and url.strip(): self._title_to_url.setdefault(title.strip(), url.strip()) except Exception: return def _save_title_url_cache(self) -> None: path = self._title_url_cache_path() try: base_url = self._get_base_url() store: Dict[str, Dict[str, str]] = {} # merge with existing try: if xbmcvfs and xbmcvfs.exists(path): handle = xbmcvfs.File(path) existing_raw = handle.read() handle.close() elif os.path.exists(path): with open(path, "r", encoding="utf-8") as handle: existing_raw = handle.read() else: existing_raw = "" existing = json.loads(existing_raw or "{}") if isinstance(existing, dict): if all(isinstance(k, str) and isinstance(v, dict) for k, v in existing.items()): store = {k: dict(v) for k, v in existing.items()} # type: ignore[arg-type] except Exception: store = {} store[base_url] = dict(self._title_to_url) payload = json.dumps(store, ensure_ascii=False, sort_keys=True) except Exception: return try: if xbmcaddon and xbmcvfs: directory = os.path.dirname(path) if directory and not xbmcvfs.exists(directory): xbmcvfs.mkdirs(directory) handle = xbmcvfs.File(path, "w") try: handle.write(payload) finally: handle.close() else: with open(path, "w", encoding="utf-8") as handle: handle.write(payload) except Exception: return def _genre_cache_path(self) -> str: return os.path.join(self._cache_dir(), "topstream_genres_cache.json") def _load_genre_cache(self) -> None: path = self._genre_cache_path() try: if xbmcvfs and xbmcvfs.exists(path): handle = xbmcvfs.File(path) raw = handle.read() handle.close() elif os.path.exists(path): with open(path, "r", encoding="utf-8") as handle: raw = handle.read() else: return loaded = json.loads(raw or "{}") if isinstance(loaded, dict): base_url = self._get_base_url() mapping = loaded.get(base_url) if isinstance(mapping, dict): for genre, url in mapping.items(): if isinstance(genre, str) and isinstance(url, str) and genre.strip() and url.strip(): self._genre_to_url.setdefault(genre.strip(), url.strip()) except Exception: return def _save_genre_cache(self) -> None: path = self._genre_cache_path() try: base_url = self._get_base_url() store: Dict[str, Dict[str, str]] = {} try: if xbmcvfs and xbmcvfs.exists(path): handle = xbmcvfs.File(path) existing_raw = handle.read() handle.close() elif os.path.exists(path): with open(path, "r", encoding="utf-8") as handle: existing_raw = handle.read() else: existing_raw = "" existing = json.loads(existing_raw or "{}") if isinstance(existing, dict): if all(isinstance(k, str) and isinstance(v, dict) for k, v in existing.items()): store = {k: dict(v) for k, v in existing.items()} # type: ignore[arg-type] except Exception: store = {} store[base_url] = dict(self._genre_to_url) payload = json.dumps(store, ensure_ascii=False, sort_keys=True) except Exception: return try: if xbmcaddon and xbmcvfs: directory = os.path.dirname(path) if directory and not xbmcvfs.exists(directory): xbmcvfs.mkdirs(directory) handle = xbmcvfs.File(path, "w") try: handle.write(payload) finally: handle.close() else: with open(path, "w", encoding="utf-8") as handle: handle.write(payload) except Exception: return def _get_session(self) -> RequestsSession: if requests is None: raise RuntimeError(self.unavailable_reason or "requests nicht verfügbar.") if self._session is None: session = requests.Session() session.headers.update(HEADERS) self._session = session return self._session def _get_base_url(self) -> str: base = DEFAULT_BASE_URL if xbmcaddon is not None: try: addon = xbmcaddon.Addon(ADDON_ID) raw = (addon.getSetting(SETTING_BASE_URL) or "").strip() if raw: base = raw except Exception: pass base = (base or "").strip() if not base: return DEFAULT_BASE_URL if not base.startswith("http://") and not base.startswith("https://"): base = "https://" + base return base.rstrip("/") def _absolute_url(self, href: str) -> str: return urljoin(self._get_base_url() + "/", href or "") @staticmethod def _absolute_external_url(href: str, *, base: str = "") -> str: href = (href or "").strip() if not href: return "" if href.startswith("//"): return "https:" + href if href.startswith("http://") or href.startswith("https://"): return href if base: return urljoin(base if base.endswith("/") else base + "/", href) return href def _notify_url(self, url: str) -> None: notify_url( ADDON_ID, heading=self.name, url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO, plugin_setting_id=SETTING_SHOW_URL_INFO, ) def _log_url(self, url: str, *, kind: str = "VISIT") -> None: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="topstream_urls.log", url=url, kind=kind, ) def _log_response_html(self, url: str, body: str) -> None: dump_response_html( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_DUMP_HTML, plugin_setting_id=SETTING_DUMP_HTML, url=url, body=body, filename_prefix="topstream_response", ) def _log_error(self, message: str) -> None: log_error( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS, plugin_setting_id=SETTING_LOG_ERRORS, log_filename="topstream_errors.log", message=message, ) def _popular_url(self) -> str: return self._absolute_url("/beliebte-filme-online.html") def popular_series(self) -> List[str]: """Liefert die "Meist gesehen"/"Beliebte Filme" Liste. Quelle: `/beliebte-filme-online.html` (TopStreamFilm Template). """ if self._popular_cache is not None: return list(self._popular_cache) if not REQUESTS_AVAILABLE or BeautifulSoup is None: self._popular_cache = [] return [] try: soup = self._get_soup(self._popular_url()) except Exception: self._popular_cache = [] return [] hits = self._parse_listing_titles(soup) titles: List[str] = [] seen: set[str] = set() for hit in hits: if not hit.title or hit.title in seen: continue seen.add(hit.title) self._title_to_url[hit.title] = hit.url self._store_title_meta(hit.title, plot=hit.description, poster=hit.poster) titles.append(hit.title) if titles: self._save_title_url_cache() self._popular_cache = list(titles) return list(titles) def _parse_genres_from_home(self, soup: BeautifulSoupT) -> Dict[str, str]: genres: Dict[str, str] = {} if soup is None: return genres # Primär: im Header-Menü unter "KATEGORIEN" categories_anchor = None for anchor in soup.select("li.menu-item-has-children a"): text = (anchor.get_text(" ", strip=True) or "").strip().casefold() if text == "kategorien": categories_anchor = anchor break if categories_anchor is not None: try: parent = categories_anchor.find_parent("li") except Exception: parent = None if parent is not None: for anchor in parent.select("ul.sub-menu li.cat-item a[href]"): name = (anchor.get_text(" ", strip=True) or "").strip() href = (anchor.get("href") or "").strip() if not name or not href: continue genres[name] = self._absolute_url(href) # Fallback: allgemeine cat-item Links (falls Theme anders ist) if not genres: for anchor in soup.select("li.cat-item a[href]"): name = (anchor.get_text(" ", strip=True) or "").strip() href = (anchor.get("href") or "").strip() if not name or not href: continue genres[name] = self._absolute_url(href) return genres def _extract_first_int(self, value: str) -> Optional[int]: match = re.search(DIGITS, value or "") return int(match.group(1)) if match else None def _strip_links_text(self, node: Any) -> str: """Extrahiert den Text eines Nodes ohne Linktexte/URLs.""" if BeautifulSoup is None: return "" try: fragment = BeautifulSoup(str(node), "html.parser") for anchor in fragment.select("a"): anchor.extract() return (fragment.get_text(" ", strip=True) or "").strip() except Exception: return "" def _pick_image_from_node(self, node: Any) -> str: if node is None: return "" image = node.select_one("img") if image is None: return "" for attr in ("data-src", "src"): value = (image.get(attr) or "").strip() if value and "lazy_placeholder" not in value.casefold(): return self._absolute_external_url(value, base=self._get_base_url()) srcset = (image.get("data-srcset") or image.get("srcset") or "").strip() if srcset: first = srcset.split(",")[0].strip().split(" ", 1)[0].strip() if first: return self._absolute_external_url(first, base=self._get_base_url()) return "" def _store_title_meta(self, title: str, *, plot: str = "", poster: str = "") -> None: title = (title or "").strip() if not title: return old_plot, old_poster = self._title_meta.get(title, ("", "")) merged_plot = (plot or old_plot or "").strip() merged_poster = (poster or old_poster or "").strip() self._title_meta[title] = (merged_plot, merged_poster) def _extract_detail_metadata(self, soup: BeautifulSoupT) -> tuple[str, str]: if not soup: return "", "" plot = "" poster = "" for selector in ("meta[property='og:description']", "meta[name='description']"): node = soup.select_one(selector) if node is None: continue content = (node.get("content") or "").strip() if content: plot = content break if not plot: candidates: list[str] = [] for paragraph in soup.select("article p, .TPost p, .Description p, .entry-content p"): text = (paragraph.get_text(" ", strip=True) or "").strip() if len(text) >= 60: candidates.append(text) if candidates: plot = max(candidates, key=len) for selector in ("meta[property='og:image']", "meta[name='twitter:image']"): node = soup.select_one(selector) if node is None: continue content = (node.get("content") or "").strip() if content: poster = self._absolute_external_url(content, base=self._get_base_url()) break if not poster: for selector in ("article", ".TPost", ".entry-content"): poster = self._pick_image_from_node(soup.select_one(selector)) if poster: break return plot, poster def _clear_stream_index_for_title(self, title: str) -> None: for key in list(self._season_to_episode_numbers.keys()): if key[0] == title: self._season_to_episode_numbers.pop(key, None) for key in list(self._episode_to_hosters.keys()): if key[0] == title: self._episode_to_hosters.pop(key, None) for key in list(self._episode_title_by_number.keys()): if key[0] == title: self._episode_title_by_number.pop(key, None) def _parse_stream_accordion(self, soup: BeautifulSoupT, *, title: str) -> None: """Parst Staffel/Episode/Hoster-Links aus der Detailseite (Accordion).""" if not soup or not title: return accordion = soup.select_one("#se-accordion") or soup.select_one(".su-accordion#se-accordion") if accordion is None: return self._clear_stream_index_for_title(title) for spoiler in accordion.select(".su-spoiler"): season_title = spoiler.select_one(".su-spoiler-title") if not season_title: continue season_text = (season_title.get_text(" ", strip=True) or "").strip() season_number = self._extract_first_int(season_text) if season_number is None: continue season_label = f"Staffel {season_number}" data_target = (season_title.get("data-target") or "").strip() content = spoiler.select_one(data_target) if data_target.startswith("#") else None if content is None: content = spoiler.select_one(".su-spoiler-content") if content is None: continue episode_numbers: set[int] = set() for row in content.select(".cu-ss"): raw_text = self._strip_links_text(row) raw_text = (raw_text or "").strip() if not raw_text: continue match = re.search( r"(?P\d+)\s*x\s*(?P\d+)\s*(?P.*)$", raw_text, flags=re.IGNORECASE, ) if not match: continue row_season = int(match.group("s")) episode_number = int(match.group("e")) if row_season != season_number: continue rest = (match.group("rest") or "").strip().replace("–", "-") # Links stehen als im HTML, d.h. hier bleibt normalerweise nur "Episode X –" übrig. if "-" in rest: rest = rest.split("-", 1)[0].strip() rest = re.sub(r"\bepisode\s*\d+\b", "", rest, flags=re.IGNORECASE).strip() rest = re.sub(r"^\W+|\W+$", "", rest).strip() if rest: self._episode_title_by_number[(title, season_number, episode_number)] = rest hosters: Dict[str, str] = {} for anchor in row.select("a[href]"): name = (anchor.get_text(" ", strip=True) or "").strip() href = (anchor.get("href") or "").strip() if not name or not href: continue hosters[name] = href if not hosters: continue episode_label = f"Episode {episode_number}" ep_title = self._episode_title_by_number.get((title, season_number, episode_number), "") if ep_title: episode_label = f"Episode {episode_number}: {ep_title}" self._episode_to_hosters[(title, season_label, episode_label)] = hosters episode_numbers.add(episode_number) self._season_to_episode_numbers[(title, season_label)] = sorted(episode_numbers) def _ensure_stream_index(self, title: str) -> None: """Stellt sicher, dass Staffel/Episoden/Hoster aus der Detailseite geparst sind.""" title = (title or "").strip() if not title: return # Wenn bereits Staffeln im Index sind, nichts tun. if any(key[0] == title for key in self._season_to_episode_numbers.keys()): return soup = self._get_detail_soup(title) if soup is None: return self._parse_stream_accordion(soup, title=title) def _get_soup(self, url: str) -> BeautifulSoupT: if BeautifulSoup is None or not REQUESTS_AVAILABLE: raise RuntimeError("requests/bs4 sind nicht verfuegbar.") session = self._get_session() self._log_url(url, kind="VISIT") self._notify_url(url) response = None try: response = session.get(url, timeout=DEFAULT_TIMEOUT) response.raise_for_status() except Exception as exc: self._log_error(f"GET {url} failed: {exc}") raise try: final_url = (response.url or url) if response is not None else url body = (response.text or "") if response is not None else "" self._log_url(final_url, kind="OK") self._log_response_html(final_url, body) return BeautifulSoup(body, "html.parser") finally: if response is not None: try: response.close() except Exception: pass def _get_detail_soup(self, title: str) -> Optional[BeautifulSoupT]: title = (title or "").strip() if not title: return None url = self._title_to_url.get(title) if not url: return None if BeautifulSoup is None or not REQUESTS_AVAILABLE: return None cached_html = self._detail_html_cache.get(title) if cached_html: return BeautifulSoup(cached_html, "html.parser") soup = self._get_soup(url) try: self._detail_html_cache[title] = str(soup) except Exception: pass return soup def _detect_movie_iframe_url(self, soup: BeautifulSoupT) -> str: """Erkennt Film-Detailseiten über eingebettetes MeineCloud-iframe.""" if not soup: return "" for frame in soup.select("iframe[src]"): src = (frame.get("src") or "").strip() if not src: continue if MEINECLOUD_HOST in src: return src return "" def _parse_meinecloud_hosters(self, soup: BeautifulSoupT, *, page_url: str) -> Dict[str, str]: """Parst Hoster-Mirrors aus MeineCloud (Film-Seite). Beispiel:
  • supervideo
  • dropload
  • 4K Server
""" hosters: Dict[str, str] = {} if not soup: return hosters for entry in soup.select("ul._player-mirrors li[data-link]"): raw_link = (entry.get("data-link") or "").strip() if not raw_link: continue name = (entry.get_text(" ", strip=True) or "").strip() name = name or "Hoster" url = self._absolute_external_url(raw_link, base=page_url) if not url: continue hosters[name] = url # Falls "4K Server" wieder auf eine MeineCloud-Seite zeigt, versuchen wir einmal zu expandieren. expanded: Dict[str, str] = {} for name, url in list(hosters.items()): if MEINECLOUD_HOST in url and "/fullhd/" in url: try: nested = self._get_soup(url) except Exception: continue nested_hosters = self._parse_meinecloud_hosters(nested, page_url=url) for nested_name, nested_url in nested_hosters.items(): expanded.setdefault(nested_name, nested_url) if expanded: hosters.update(expanded) return hosters def _extract_last_page(self, soup: BeautifulSoupT) -> int: """Liest aus `div.wp-pagenavi` die höchste Seitenzahl.""" if not soup: return 1 numbers: List[int] = [] for anchor in soup.select("div.wp-pagenavi a"): text = (anchor.get_text(" ", strip=True) or "").strip() if text.isdigit(): try: numbers.append(int(text)) except Exception: continue return max(numbers) if numbers else 1 def _parse_listing_titles(self, soup: BeautifulSoupT) -> List[SearchHit]: hits: List[SearchHit] = [] if not soup: return hits for item in soup.select("li.TPostMv"): anchor = item.select_one("a[href]") if not anchor: continue href = (anchor.get("href") or "").strip() if not href: continue title_tag = anchor.select_one("h3.Title") raw_title = title_tag.get_text(" ", strip=True) if title_tag else anchor.get_text(" ", strip=True) raw_title = (raw_title or "").strip() is_movie_hint = bool(re.search(r"\bder\s+film\b", raw_title, flags=re.IGNORECASE)) title = _strip_der_film_suffix(raw_title) if not title: continue if is_movie_hint: self._movie_title_hint.add(title) description_tag = item.select_one(".TPMvCn .Description, .Description, .entry-summary") description = (description_tag.get_text(" ", strip=True) or "").strip() if description_tag else "" poster = self._pick_image_from_node(item) hits.append( SearchHit( title=title, url=self._absolute_url(href), description=description, poster=poster, ) ) return hits def is_movie(self, title: str) -> bool: """Schneller Hint (ohne Detail-Request), ob ein Titel ein Film ist.""" title = (title or "").strip() if not title: return False if title in self._movie_iframe_url or title in self._movie_title_hint: return True # Robust: Detailseite prüfen. # Laut TopStream-Layout sind Serien-Seiten durch `div.serie-menu` (Staffel-Navigation) # gekennzeichnet. Fehlt das Element, behandeln wir den Titel als Film. soup = self._get_detail_soup(title) if soup is None: return False has_seasons = bool(soup.select_one("div.serie-menu") or soup.select_one(".serie-menu")) return not has_seasons def genre_page_count(self, genre: str) -> int: """Optional: Liefert die letzte Seite eines Genres (Pagination).""" if not REQUESTS_AVAILABLE or BeautifulSoup is None: return 1 genre = (genre or "").strip() if not genre: return 1 if genre in self._genre_last_page: return max(1, int(self._genre_last_page[genre] or 1)) if not self._genre_to_url: self.genres() url = self._genre_to_url.get(genre) if not url: return 1 try: soup = self._get_soup(url) except Exception: return 1 last_page = self._extract_last_page(soup) self._genre_last_page[genre] = max(1, int(last_page or 1)) return self._genre_last_page[genre] def titles_for_genre_page(self, genre: str, page: int) -> List[str]: """Optional: Liefert Titel für ein Genre und eine konkrete Seite.""" if not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] genre = (genre or "").strip() if not genre: return [] if not self._genre_to_url: self.genres() base_url = self._genre_to_url.get(genre) if not base_url: return [] page = max(1, int(page or 1)) if page == 1: url = base_url else: url = urljoin(base_url.rstrip("/") + "/", f"page/{page}/") try: soup = self._get_soup(url) except Exception: return [] hits = self._parse_listing_titles(soup) titles: List[str] = [] seen: set[str] = set() for hit in hits: if hit.title in seen: continue seen.add(hit.title) self._title_to_url[hit.title] = hit.url self._store_title_meta(hit.title, plot=hit.description, poster=hit.poster) titles.append(hit.title) if titles: self._save_title_url_cache() return titles def _ensure_title_index(self, title: str) -> None: """Stellt sicher, dass Film/Serie-Infos für den Titel geparst sind.""" title = (title or "").strip() if not title: return # Bereits bekannt? if title in self._movie_iframe_url: return if any(key[0] == title for key in self._season_to_episode_numbers.keys()): return soup = self._get_detail_soup(title) if soup is None: return movie_url = self._detect_movie_iframe_url(soup) if movie_url: self._movie_iframe_url[title] = movie_url # Film als Single-Season/Single-Episode abbilden, damit ViewIt navigieren kann. season_label = "Film" episode_label = "Stream" self._season_cache[title] = [season_label] self._episode_cache[(title, season_label)] = [episode_label] try: meinecloud_soup = self._get_soup(movie_url) hosters = self._parse_meinecloud_hosters(meinecloud_soup, page_url=movie_url) except Exception: hosters = {} self._episode_to_hosters[(title, season_label, episode_label)] = hosters or {"MeineCloud": movie_url} return # Sonst: Serie via Streams-Accordion parsen (falls vorhanden). self._parse_stream_accordion(soup, title=title) async def search_titles(self, query: str, progress_callback: ProgressCallback = None) -> List[str]: """Sucht Titel ueber eine HTML-Suche. Erwartetes HTML (Snippet): - Treffer: `li.TPostMv a[href]` - Titel: `h3.Title` """ if not REQUESTS_AVAILABLE: return [] query = (query or "").strip() if not query: return [] _emit_progress(progress_callback, "Topstreamfilm Suche", 15) session = self._get_session() url = self._get_base_url() + "/" params = {"story": query, "do": "search", "subaction": "search"} request_url = f"{url}?{urlencode(params)}" self._log_url(request_url, kind="GET") self._notify_url(request_url) response = None try: response = session.get( url, params=params, timeout=DEFAULT_TIMEOUT, ) response.raise_for_status() except Exception as exc: self._log_error(f"GET {request_url} failed: {exc}") raise try: final_url = (response.url or request_url) if response is not None else request_url body = (response.text or "") if response is not None else "" self._log_url(final_url, kind="OK") self._log_response_html(final_url, body) if BeautifulSoup is None: return [] soup = BeautifulSoup(body, "html.parser") finally: if response is not None: try: response.close() except Exception: pass hits: List[SearchHit] = [] items = soup.select("li.TPostMv") total_items = max(1, len(items)) for idx, item in enumerate(items, start=1): if idx == 1 or idx % 20 == 0: _emit_progress(progress_callback, f"Treffer pruefen {idx}/{total_items}", 55) anchor = item.select_one("a[href]") if not anchor: continue href = (anchor.get("href") or "").strip() if not href: continue title_tag = anchor.select_one("h3.Title") raw_title = title_tag.get_text(" ", strip=True) if title_tag else anchor.get_text(" ", strip=True) raw_title = (raw_title or "").strip() is_movie_hint = bool(re.search(r"\bder\s+film\b", raw_title, flags=re.IGNORECASE)) title = _strip_der_film_suffix(raw_title) if not title: continue if is_movie_hint: self._movie_title_hint.add(title) description_tag = item.select_one(".TPMvCn .Description") description = description_tag.get_text(" ", strip=True) if description_tag else "" poster = self._pick_image_from_node(item) hit = SearchHit(title=title, url=self._absolute_url(href), description=description, poster=poster) if _matches_query(query, title=hit.title, description=hit.description): hits.append(hit) # Dedup + mapping fuer Navigation self._title_to_url.clear() titles: List[str] = [] seen: set[str] = set() for hit in hits: if hit.title in seen: continue seen.add(hit.title) self._title_to_url[hit.title] = hit.url self._store_title_meta(hit.title, plot=hit.description, poster=hit.poster) titles.append(hit.title) self._save_title_url_cache() _emit_progress(progress_callback, f"Fertig: {len(titles)} Treffer", 95) return titles def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]: title = (title or "").strip() if not title: return {}, {}, None info: dict[str, str] = {"title": title} art: dict[str, str] = {} cached_plot, cached_poster = self._title_meta.get(title, ("", "")) if cached_plot: info["plot"] = cached_plot if cached_poster: art = {"thumb": cached_poster, "poster": cached_poster} if "plot" in info and art: return info, art, None soup = self._get_detail_soup(title) if soup is None: return info, art, None plot, poster = self._extract_detail_metadata(soup) if plot: info["plot"] = plot if poster: art = {"thumb": poster, "poster": poster} self._store_title_meta(title, plot=plot, poster=poster) return info, art, None def genres(self) -> List[str]: if not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] if self._genre_to_url: return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold()) try: soup = self._get_soup(self._get_base_url() + "/") except Exception: return [] parsed = self._parse_genres_from_home(soup) self._genre_to_url.clear() self._genre_to_url.update(parsed) self._save_genre_cache() return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold()) def titles_for_genre(self, genre: str) -> List[str]: if not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] genre = (genre or "").strip() if not genre: return [] if not self._genre_to_url: self.genres() url = self._genre_to_url.get(genre) if not url: return [] # Backwards-compatible: liefert nur Seite 1 (Paging läuft über titles_for_genre_page()). titles = self.titles_for_genre_page(genre, 1) titles.sort(key=lambda value: value.casefold()) return titles def seasons_for(self, title: str) -> List[str]: title = (title or "").strip() if not title or not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] self._ensure_title_index(title) if title in self._movie_iframe_url: return ["Film"] # Primär: Streams-Accordion (enthält echte Staffel-/Episodenlistings). self._ensure_stream_index(title) seasons = sorted( {season_label for (t, season_label) in self._season_to_episode_numbers.keys() if t == title}, key=lambda value: (self._extract_first_int(value) or 0), ) if seasons: self._season_cache[title] = list(seasons) return list(seasons) # Fallback: Staffel-Tabs im Seitenmenü (ohne Links). cached = self._season_cache.get(title) if cached is not None: return list(cached) soup = self._get_detail_soup(title) if soup is None: self._season_cache[title] = [] return [] numbers: List[int] = [] seen: set[int] = set() for anchor in soup.select( "div.serie-menu div.tt_season ul.nav a[href^='#season-']," " .serie-menu .tt_season a[href^='#season-']," " a[data-toggle='tab'][href^='#season-']" ): text = (anchor.get_text(" ", strip=True) or "").strip() num = self._extract_first_int(text) if num is None: href = (anchor.get("href") or "").strip() num = self._extract_first_int(href.replace("#season-", "")) if num is None or num in seen: continue seen.add(num) numbers.append(num) seasons = [f"Staffel {n}" for n in sorted(numbers)] self._season_cache[title] = list(seasons) return list(seasons) def episodes_for(self, title: str, season: str) -> List[str]: title = (title or "").strip() season = (season or "").strip() if not title or not season or not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] self._ensure_title_index(title) if title in self._movie_iframe_url and season == "Film": return ["Stream"] cache_key = (title, season) cached = self._episode_cache.get(cache_key) if cached is not None: return list(cached) self._ensure_stream_index(title) episode_numbers = self._season_to_episode_numbers.get((title, season), []) episodes: List[str] = [] season_number = self._extract_first_int(season) or 0 for ep_no in episode_numbers: label = f"Episode {ep_no}" ep_title = self._episode_title_by_number.get((title, season_number, ep_no), "") if ep_title: label = f"Episode {ep_no}: {ep_title}" episodes.append(label) self._episode_cache[cache_key] = list(episodes) return list(episodes) def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]: title = (title or "").strip() season = (season or "").strip() episode = (episode or "").strip() if not title or not season or not episode: return [] if not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] self._ensure_title_index(title) self._ensure_stream_index(title) hosters = self._episode_to_hosters.get((title, season, episode), {}) return sorted(hosters.keys(), key=lambda value: value.casefold()) def set_preferred_hosters(self, hosters: List[str]) -> None: normalized = [hoster.strip().lower() for hoster in hosters if hoster and hoster.strip()] if normalized: self._preferred_hosters = normalized def reset_preferred_hosters(self) -> None: self._preferred_hosters = list(self._default_preferred_hosters) def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: title = (title or "").strip() season = (season or "").strip() episode = (episode or "").strip() if not title or not season or not episode: return None if not REQUESTS_AVAILABLE or BeautifulSoup is None: return None self._ensure_title_index(title) self._ensure_stream_index(title) hosters = self._episode_to_hosters.get((title, season, episode), {}) if not hosters: return None preferred = [h.casefold() for h in (self._preferred_hosters or [])] if preferred: for preferred_name in preferred: for actual_name, url in hosters.items(): if actual_name.casefold() == preferred_name: return url # Wenn nichts passt: deterministisch den ersten. first_name = sorted(hosters.keys(), key=lambda value: value.casefold())[0] return hosters.get(first_name) def resolve_stream_link(self, link: str) -> Optional[str]: from plugin_helpers import resolve_via_resolveurl return resolve_via_resolveurl(link, fallback_to_link=True) def capabilities(self) -> set[str]: return {"genres", "popular_series", "year_filter"} def years_available(self) -> List[str]: """Liefert verfügbare Erscheinungsjahre (aktuelles Jahr bis 1980).""" import datetime current_year = datetime.date.today().year return [str(y) for y in range(current_year, 1979, -1)] def titles_for_year(self, year: str, page: int = 1) -> List[str]: """Liefert Titel für ein bestimmtes Erscheinungsjahr. URL-Muster: /xfsearch/{year}/ oder /xfsearch/{year}/page/{n}/ """ year = (year or "").strip() if not year or not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] page = max(1, int(page or 1)) base = self._get_base_url() if page == 1: url = f"{base}/xfsearch/{year}/" else: url = f"{base}/xfsearch/{year}/page/{page}/" try: soup = self._get_soup(url) except Exception: return [] hits = self._parse_listing_titles(soup) titles: List[str] = [] seen: set[str] = set() for hit in hits: if hit.title in seen: continue seen.add(hit.title) self._title_to_url[hit.title] = hit.url self._store_title_meta(hit.title, plot=hit.description, poster=hit.poster) titles.append(hit.title) if titles: self._save_title_url_cache() return titles def new_titles_page(self, page: int = 1) -> List[str]: """Liefert neu hinzugefügte Filme. URL-Muster: /neueste-filme/ oder /neueste-filme/page/{n}/ """ if not REQUESTS_AVAILABLE or BeautifulSoup is None: return [] page = max(1, int(page or 1)) base = self._get_base_url() if page == 1: url = f"{base}/neueste-filme/" else: url = f"{base}/neueste-filme/page/{page}/" try: soup = self._get_soup(url) except Exception: return [] hits = self._parse_listing_titles(soup) titles: List[str] = [] seen: set[str] = set() for hit in hits: if hit.title in seen: continue seen.add(hit.title) self._title_to_url[hit.title] = hit.url self._store_title_meta(hit.title, plot=hit.description, poster=hit.poster) titles.append(hit.title) if titles: self._save_title_url_cache() return titles def new_titles(self) -> List[str]: return self.new_titles_page(1) # Alias für die automatische Plugin-Erkennung. Plugin = TopstreamfilmPlugin