"""Filmpalast Integration (movie-style provider). Hinweis: - Der Parser ist bewusst defensiv und arbeitet mit mehreren Fallback-Selektoren, da Filmpalast-Layouts je Domain variieren koennen. """ from __future__ import annotations from dataclasses import dataclass import re from urllib.parse import quote, urlencode from urllib.parse import urljoin from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple try: # pragma: no cover - optional dependency import requests from bs4 import BeautifulSoup # type: ignore[import-not-found] except ImportError as exc: # pragma: no cover - optional dependency requests = None BeautifulSoup = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None from plugin_interface import BasisPlugin from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text from http_session_pool import get_requests_session if TYPE_CHECKING: # pragma: no cover from requests import Session as RequestsSession from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found] else: # pragma: no cover RequestsSession = Any BeautifulSoupT = Any ADDON_ID = "plugin.video.viewit" SETTING_BASE_URL = "filmpalast_base_url" DEFAULT_BASE_URL = "https://filmpalast.to" DEFAULT_TIMEOUT = 20 DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"] SERIES_HINT_PREFIX = "series://filmpalast/" SERIES_VIEW_PATH = "/serien/view" SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE) GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info" GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors" SETTING_LOG_URLS = "log_urls_filmpalast" SETTING_DUMP_HTML = "dump_html_filmpalast" SETTING_SHOW_URL_INFO = "show_url_info_filmpalast" SETTING_LOG_ERRORS = "log_errors_filmpalast" ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] def _emit_progress(callback: ProgressCallback, message: str, percent: Optional[int] = None) -> None: if not callable(callback): return try: callback(str(message or ""), None if percent is None else int(percent)) except Exception: return HEADERS = { "User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Connection": "keep-alive", } @dataclass(frozen=True) class SearchHit: title: str url: str @dataclass(frozen=True) class EpisodeEntry: season: int episode: int suffix: str url: str def _get_base_url() -> str: base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip() if not base: base = DEFAULT_BASE_URL return base.rstrip("/") def _absolute_url(url: str) -> str: url = (url or "").strip() if not url: return "" if url.startswith("http://") or url.startswith("https://"): return url if url.startswith("//"): return f"https:{url}" if url.startswith("/"): return f"{_get_base_url()}{url}" return f"{_get_base_url()}/{url.lstrip('/')}" def _normalize_search_text(value: str) -> str: return _shared_normalize_search_text(value) def _matches_query(query: str, *, title: str) -> bool: return _shared_matches_query(query, title=title) def _is_probably_content_url(url: str) -> bool: lower = (url or "").casefold() if not lower: return False block_markers = ( "/genre/", "/kategorie/", "/category/", "/tag/", "/login", "/register", "/kontakt", "/impressum", "/datenschutz", "/dmca", "/agb", "javascript:", "#", ) if any(marker in lower for marker in block_markers): return False allow_markers = ("/stream/", "/film/", "/movie/", "/serien/", "/serie/", "/title/") return any(marker in lower for marker in allow_markers) def _log_url_event(url: str, *, kind: str = "VISIT") -> None: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="filmpalast_urls.log", url=url, kind=kind, ) def _log_visit(url: str) -> None: _log_url_event(url, kind="VISIT") notify_url( ADDON_ID, heading="Filmpalast", url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO, plugin_setting_id=SETTING_SHOW_URL_INFO, ) def _log_response_html(url: str, body: str) -> None: dump_response_html( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_DUMP_HTML, plugin_setting_id=SETTING_DUMP_HTML, url=url, body=body, filename_prefix="filmpalast_response", ) def _log_error_message(message: str) -> None: log_error( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS, plugin_setting_id=SETTING_LOG_ERRORS, log_filename="filmpalast_errors.log", message=message, ) def _is_series_hint_url(value: str) -> bool: return (value or "").startswith(SERIES_HINT_PREFIX) def _series_hint_value(title: str) -> str: safe_title = quote((title or "").strip(), safe="") return f"{SERIES_HINT_PREFIX}{safe_title}" if safe_title else SERIES_HINT_PREFIX def _extract_number(value: str) -> Optional[int]: match = re.search(r"(\d+)", value or "") if not match: return None try: return int(match.group(1)) except Exception: return None def _strip_series_alias(title: str) -> str: return re.sub(r"\s*\(serie\)\s*$", "", title or "", flags=re.IGNORECASE).strip() def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT: if requests is None or BeautifulSoup is None: raise RuntimeError("requests/bs4 sind nicht verfuegbar.") _log_visit(url) sess = session or get_requests_session("filmpalast", headers=HEADERS) response = None try: response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() except Exception as exc: _log_error_message(f"GET {url} failed: {exc}") raise try: final_url = (response.url or url) if response is not None else url body = (response.text or "") if response is not None else "" if final_url != url: _log_url_event(final_url, kind="REDIRECT") _log_response_html(url, body) return BeautifulSoup(body, "html.parser") finally: if response is not None: try: response.close() except Exception: pass class FilmpalastPlugin(BasisPlugin): name = "Filmpalast" version = "1.0.0" def __init__(self) -> None: self._title_to_url: Dict[str, str] = {} self._title_meta: Dict[str, tuple[str, str]] = {} self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {} self._hoster_cache: Dict[str, Dict[str, str]] = {} self._genre_to_url: Dict[str, str] = {} self._genre_page_count_cache: Dict[str, int] = {} self._alpha_to_url: Dict[str, str] = {} self._alpha_page_count_cache: Dict[str, int] = {} self._series_page_count_cache: Dict[int, int] = {} self._requests_available = REQUESTS_AVAILABLE self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._preferred_hosters: List[str] = list(self._default_preferred_hosters) self.is_available = True self.unavailable_reason: Optional[str] = None if not self._requests_available: # pragma: no cover - optional dependency self.is_available = False self.unavailable_reason = ( "requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'." ) if REQUESTS_IMPORT_ERROR: print(f"FilmpalastPlugin Importfehler: {REQUESTS_IMPORT_ERROR}") def _lookup_title_url(self, title: str) -> str: title = (title or "").strip() if not title: return "" direct = self._title_to_url.get(title) if direct: return direct wanted = title.casefold() for key, value in self._title_to_url.items(): if key.casefold() == wanted and value: return value return "" def _series_key_for_title(self, title: str) -> str: title = (title or "").strip() if not title: return "" if title in self._series_entries: return title wanted = title.casefold() for key in self._series_entries.keys(): if key.casefold() == wanted: return key return "" def _has_series_entries(self, title: str) -> bool: return bool(self._series_key_for_title(title)) def _episode_entry_from_hit(self, hit: SearchHit) -> Optional[Tuple[str, EpisodeEntry]]: title = (hit.title or "").strip() if not title: return None marker = SEASON_EPISODE_RE.search(title) if not marker: return None try: season_number = int(marker.group(1)) episode_number = int(marker.group(2)) except Exception: return None series_title = re.sub(r"\s+", " ", title[: marker.start()] or "").strip(" -|:;,_") if not series_title: return None suffix = re.sub(r"\s+", " ", title[marker.end() :] or "").strip(" -|:;,_") entry = EpisodeEntry(season=season_number, episode=episode_number, suffix=suffix, url=hit.url) return (series_title, entry) def _add_series_entry(self, series_title: str, entry: EpisodeEntry) -> None: if not series_title or not entry.url: return seasons = self._series_entries.setdefault(series_title, {}) episodes = seasons.setdefault(entry.season, {}) if entry.episode not in episodes: episodes[entry.episode] = entry def _ensure_series_entries_for_title(self, title: str) -> str: series_key = self._series_key_for_title(title) if series_key: return series_key original_title = (title or "").strip() lookup_title = _strip_series_alias(original_title) if not lookup_title: return "" if not self._requests_available: return "" wanted = _normalize_search_text(lookup_title) hits = self._search_hits(lookup_title) for hit in hits: parsed = self._episode_entry_from_hit(hit) if not parsed: continue series_title, entry = parsed if wanted and _normalize_search_text(series_title) != wanted: continue self._add_series_entry(series_title, entry) self._title_to_url.setdefault(series_title, _series_hint_value(series_title)) resolved = self._series_key_for_title(original_title) or self._series_key_for_title(lookup_title) if resolved and original_title and original_title != resolved: self._series_entries[original_title] = self._series_entries[resolved] self._title_to_url.setdefault(original_title, _series_hint_value(resolved)) return original_title return resolved def _detail_url_for_selection(self, title: str, season: str, episode: str) -> str: series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title) if series_key: season_number = _extract_number(season) episode_number = _extract_number(episode) if season_number is None or episode_number is None: return "" entry = self._series_entries.get(series_key, {}).get(season_number, {}).get(episode_number) return entry.url if entry else "" return self._ensure_title_url(title) def _search_hits(self, query: str) -> List[SearchHit]: query = (query or "").strip() if not query: return [] if not self._requests_available or requests is None: return [] session = get_requests_session("filmpalast", headers=HEADERS) search_requests = [(_absolute_url(f"/search/title/{quote(query)}"), None)] hits: List[SearchHit] = [] seen_titles: set[str] = set() seen_urls: set[str] = set() for base_url, params in search_requests: response = None try: request_url = base_url if not params else f"{base_url}?{urlencode(params)}" _log_url_event(request_url, kind="GET") _log_visit(request_url) response = session.get(base_url, params=params, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() if response.url and response.url != request_url: _log_url_event(response.url, kind="REDIRECT") _log_response_html(request_url, response.text) soup = BeautifulSoup(response.text, "html.parser") except Exception as exc: _log_error_message(f"search request failed ({base_url}): {exc}") continue finally: if response is not None: try: response.close() except Exception: pass anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]") if not anchors: anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']") for anchor in anchors: href = (anchor.get("href") or "").strip() if not href: continue url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/") if not _is_probably_content_url(url): continue title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip() title = (title or "").strip() if not title: continue if title.casefold() in {"details/play", "play", "details"}: continue if not _matches_query(query, title=title): continue title_key = title.casefold() url_key = url.casefold() if title_key in seen_titles or url_key in seen_urls: continue seen_titles.add(title_key) seen_urls.add(url_key) _log_url_event(url, kind="PARSE") hits.append(SearchHit(title=title, url=url)) if hits: break return hits def _parse_listing_hits(self, soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]: hits: List[SearchHit] = [] if not soup: return hits seen_titles: set[str] = set() seen_urls: set[str] = set() anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]") if not anchors: anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']") for anchor in anchors: href = (anchor.get("href") or "").strip() if not href: continue url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/") if not _is_probably_content_url(url): continue title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip() if not title: continue if title.casefold() in {"details/play", "play", "details"}: continue if query and not _matches_query(query, title=title): continue title_key = title.casefold() url_key = url.casefold() if title_key in seen_titles or url_key in seen_urls: continue seen_titles.add(title_key) seen_urls.add(url_key) _log_url_event(url, kind="PARSE") hits.append(SearchHit(title=title, url=url)) return hits def _apply_hits_to_title_index(self, hits: List[SearchHit]) -> List[str]: self._title_to_url = {} self._series_entries = {} self._hoster_cache.clear() movie_titles: List[str] = [] series_titles_seen: set[str] = set() for hit in hits: parsed = self._episode_entry_from_hit(hit) if parsed: series_title, entry = parsed self._add_series_entry(series_title, entry) if series_title.casefold() not in series_titles_seen: self._title_to_url[series_title] = _series_hint_value(series_title) series_titles_seen.add(series_title.casefold()) continue title = (hit.title or "").strip() if not title: continue movie_titles.append(title) self._title_to_url[title] = hit.url titles: List[str] = list(movie_titles) movie_keys = {entry.casefold() for entry in movie_titles} for series_title in sorted(self._series_entries.keys(), key=lambda value: value.casefold()): if series_title.casefold() in movie_keys: alias = f"{series_title} (Serie)" self._title_to_url[alias] = self._title_to_url.get(series_title, _series_hint_value(series_title)) self._series_entries[alias] = self._series_entries[series_title] titles.append(alias) else: titles.append(series_title) titles.sort(key=lambda value: value.casefold()) return titles async def search_titles(self, query: str, progress_callback: ProgressCallback = None) -> List[str]: _emit_progress(progress_callback, "Filmpalast Suche", 15) hits = self._search_hits(query) _emit_progress(progress_callback, f"Treffer verarbeiten ({len(hits)})", 70) titles = self._apply_hits_to_title_index(hits) _emit_progress(progress_callback, f"Fertig: {len(titles)} Treffer", 95) return titles def _parse_genres(self, soup: BeautifulSoupT) -> Dict[str, str]: genres: Dict[str, str] = {} if not soup: return genres for anchor in soup.select("section#genre a[href], #genre a[href], aside #genre a[href]"): name = (anchor.get_text(" ", strip=True) or "").strip() href = (anchor.get("href") or "").strip() if not name or not href: continue if "/search/genre/" not in href: continue genres[name] = _absolute_url(href) return genres def _extract_last_page(self, soup: BeautifulSoupT) -> int: max_page = 1 if not soup: return max_page for anchor in soup.select("#paging a[href], .paging a[href], a.pageing[href]"): text = (anchor.get_text(" ", strip=True) or "").strip() for candidate in (text, (anchor.get("href") or "").strip()): for value in re.findall(r"(\d+)", candidate): try: max_page = max(max_page, int(value)) except Exception: continue return max_page def capabilities(self) -> set[str]: return {"genres", "alpha", "series_catalog", "popular_series", "new_titles"} def _parse_alpha_links(self, soup: BeautifulSoupT) -> Dict[str, str]: alpha: Dict[str, str] = {} if not soup: return alpha for anchor in soup.select("section#movietitle a[href], #movietitle a[href], aside #movietitle a[href]"): name = (anchor.get_text(" ", strip=True) or "").strip() href = (anchor.get("href") or "").strip() if not name or not href: continue if "/search/alpha/" not in href: continue if name in alpha: continue alpha[name] = _absolute_url(href) return alpha def alpha_index(self) -> List[str]: if not self._requests_available: return [] if self._alpha_to_url: return list(self._alpha_to_url.keys()) try: soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return [] parsed = self._parse_alpha_links(soup) if parsed: self._alpha_to_url = dict(parsed) return list(self._alpha_to_url.keys()) def alpha_page_count(self, letter: str) -> int: letter = (letter or "").strip() if not letter: return 1 if letter in self._alpha_page_count_cache: return max(1, int(self._alpha_page_count_cache.get(letter, 1))) if not self._alpha_to_url: self.alpha_index() base_url = self._alpha_to_url.get(letter, "") if not base_url: return 1 try: soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return 1 pages = self._extract_last_page(soup) self._alpha_page_count_cache[letter] = max(1, pages) return self._alpha_page_count_cache[letter] def titles_for_alpha_page(self, letter: str, page: int) -> List[str]: letter = (letter or "").strip() if not letter or not self._requests_available: return [] if not self._alpha_to_url: self.alpha_index() base_url = self._alpha_to_url.get(letter, "") if not base_url: return [] page = max(1, int(page or 1)) url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}") try: soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return [] hits = self._parse_listing_hits(soup) return self._apply_hits_to_title_index(hits) def titles_for_alpha(self, letter: str) -> List[str]: titles = self.titles_for_alpha_page(letter, 1) titles.sort(key=lambda value: value.casefold()) return titles def _series_view_url(self) -> str: return _absolute_url(SERIES_VIEW_PATH) def series_catalog_page_count(self, page: int = 1) -> int: if not self._requests_available: return 1 cache_key = int(page or 1) if cache_key in self._series_page_count_cache: return max(1, int(self._series_page_count_cache.get(cache_key, 1))) base_url = self._series_view_url() if not base_url: return 1 try: soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return 1 pages = self._extract_last_page(soup) self._series_page_count_cache[cache_key] = max(1, pages) return self._series_page_count_cache[cache_key] def series_catalog_page(self, page: int) -> List[str]: if not self._requests_available: return [] base_url = self._series_view_url() if not base_url: return [] page = max(1, int(page or 1)) url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}") try: soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return [] hits = self._parse_listing_hits(soup) return self._apply_hits_to_title_index(hits) def series_catalog_has_more(self, page: int) -> bool: total = self.series_catalog_page_count(page) return page < total def genres(self) -> List[str]: if not self._requests_available: return [] if self._genre_to_url: return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold()) try: soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return [] parsed = self._parse_genres(soup) if parsed: self._genre_to_url = dict(parsed) return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold()) def genre_page_count(self, genre: str) -> int: genre = (genre or "").strip() if not genre: return 1 if genre in self._genre_page_count_cache: return max(1, int(self._genre_page_count_cache.get(genre, 1))) if not self._genre_to_url: self.genres() base_url = self._genre_to_url.get(genre, "") if not base_url: return 1 try: soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return 1 pages = self._extract_last_page(soup) self._genre_page_count_cache[genre] = max(1, pages) return self._genre_page_count_cache[genre] def titles_for_genre_page(self, genre: str, page: int) -> List[str]: genre = (genre or "").strip() if not genre or not self._requests_available: return [] if not self._genre_to_url: self.genres() base_url = self._genre_to_url.get(genre, "") if not base_url: return [] page = max(1, int(page or 1)) url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}") try: soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return [] hits = self._parse_listing_hits(soup) return self._apply_hits_to_title_index(hits) def titles_for_genre(self, genre: str) -> List[str]: titles = self.titles_for_genre_page(genre, 1) titles.sort(key=lambda value: value.casefold()) return titles def _ensure_title_url(self, title: str) -> str: title = (title or "").strip() if not title: return "" direct = self._lookup_title_url(title) if direct and _is_series_hint_url(direct): return "" if direct: self._title_to_url[title] = direct return direct if self._has_series_entries(title) or self._ensure_series_entries_for_title(title): self._title_to_url[title] = _series_hint_value(title) return "" wanted = title.casefold() hits = self._search_hits(title) for hit in hits: if self._episode_entry_from_hit(hit): continue if hit.title.casefold() == wanted and hit.url: self._title_to_url[title] = hit.url return hit.url return "" def _store_title_meta(self, title: str, *, plot: str = "", poster: str = "") -> None: title = (title or "").strip() if not title: return old_plot, old_poster = self._title_meta.get(title, ("", "")) merged_plot = (plot or old_plot or "").strip() merged_poster = (poster or old_poster or "").strip() self._title_meta[title] = (merged_plot, merged_poster) def _extract_detail_metadata(self, soup: BeautifulSoupT) -> tuple[str, str, str]: if not soup: return "", "", "" root = soup.select_one("div#content[role='main']") or soup detail = root.select_one("article.detail") or root plot = "" poster = "" # Filmpalast Detailseite: bevorzugt den dedizierten Filmhandlung-Block. plot_node = detail.select_one( "li[itemtype='http://schema.org/Movie'] span[itemprop='description']" ) if plot_node is not None: plot = (plot_node.get_text(" ", strip=True) or "").strip() if not plot: hidden_plot = detail.select_one("cite span.hidden") if hidden_plot is not None: plot = (hidden_plot.get_text(" ", strip=True) or "").strip() if not plot: for selector in ("meta[property='og:description']", "meta[name='description']"): node = root.select_one(selector) if node is None: continue content = (node.get("content") or "").strip() if content: plot = content break # Filmpalast Detailseite: Cover liegt stabil in `img.cover2`. cover = detail.select_one("img.cover2") if cover is not None: value = (cover.get("data-src") or cover.get("src") or "").strip() if value: candidate = _absolute_url(value) lower = candidate.casefold() if "/themes/" not in lower and "spacer.gif" not in lower and "/files/movies/" in lower: poster = candidate if not poster: thumb_node = detail.select_one("li[itemtype='http://schema.org/Movie'] img[itemprop='image']") if thumb_node is not None: value = (thumb_node.get("data-src") or thumb_node.get("src") or "").strip() if value: candidate = _absolute_url(value) lower = candidate.casefold() if "/themes/" not in lower and "spacer.gif" not in lower and "/files/movies/" in lower: poster = candidate # IMDb-Rating: Schema.org aggregateRating rating = "" rating_node = detail.select_one("[itemprop='ratingValue']") if rating_node is not None: rating = (rating_node.get_text(" ", strip=True) or "").strip() if not rating: # Fallback: data-attribute oder Klassen-basierte Anzeige for sel in ("span.imdb", "span.rating", "[class*='imdb']"): node = detail.select_one(sel) if node is not None: candidate = (node.get_text(" ", strip=True) or "").strip() if candidate: rating = candidate break return plot, poster, rating def remember_series_url(self, title: str, series_url: str) -> None: title = (title or "").strip() series_url = (series_url or "").strip() if not title or not series_url: return self._title_to_url[title] = series_url self._hoster_cache.clear() def series_url_for_title(self, title: str) -> str: title = (title or "").strip() if not title: return "" direct = self._lookup_title_url(title) if direct: return direct series_key = self._series_key_for_title(title) if series_key: return _series_hint_value(series_key) return "" def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]: title = (title or "").strip() if not title: return {}, {}, None info: dict[str, str] = {"title": title} art: dict[str, str] = {} cached_plot, cached_poster = self._title_meta.get(title, ("", "")) if cached_plot: info["plot"] = cached_plot if cached_poster: art = {"thumb": cached_poster, "poster": cached_poster} if "plot" in info and art: return info, art, None detail_url = self._ensure_title_url(title) if not detail_url: series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title) if series_key: seasons = self._series_entries.get(series_key, {}) first_entry: Optional[EpisodeEntry] = None for season_number in sorted(seasons.keys()): episodes = seasons.get(season_number, {}) for episode_number in sorted(episodes.keys()): first_entry = episodes.get(episode_number) if first_entry is not None: break if first_entry is not None: break detail_url = first_entry.url if first_entry is not None else "" if not detail_url: return info, art, None try: soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS)) plot, poster, rating = self._extract_detail_metadata(soup) except Exception: plot, poster, rating = "", "", "" if plot: info["plot"] = plot if rating: try: info["rating"] = str(float(rating.replace(",", "."))) except (ValueError, TypeError): pass if poster: art = {"thumb": poster, "poster": poster} self._store_title_meta(title, plot=info.get("plot", ""), poster=poster) return info, art, None def is_movie(self, title: str) -> bool: title = (title or "").strip() if not title: return False direct = self._lookup_title_url(title) if direct: return not _is_series_hint_url(direct) if SEASON_EPISODE_RE.search(title): return False if self._has_series_entries(title): return False if self._ensure_series_entries_for_title(title): return False return True @staticmethod def _normalize_hoster_name(name: str) -> str: name = (name or "").strip() if not name: return "" name = re.sub(r"\s+", " ", name) return name def _extract_hoster_links(self, soup: BeautifulSoupT) -> Dict[str, str]: hosters: Dict[str, str] = {} if not soup: return hosters # Primäres Layout: jeder Hoster in eigener UL mit hostName + Play-Link. for block in soup.select("ul.currentStreamLinks"): host_name_node = block.select_one("li.hostBg .hostName") host_name = self._normalize_hoster_name(host_name_node.get_text(" ", strip=True) if host_name_node else "") play_anchor = block.select_one("li.streamPlayBtn a[href], li.streamPlayBtn a[data-player-url], a.button.iconPlay[href]") href = (play_anchor.get("href") or play_anchor.get("data-player-url") if play_anchor else "") or "" play_url = _absolute_url(href).strip() if not play_url: continue if not host_name: host_name = self._normalize_hoster_name(play_anchor.get_text(" ", strip=True) if play_anchor else "") if not host_name: host_name = "Unbekannt" if host_name not in hosters: hosters[host_name] = play_url # Fallback: direkte Play-Buttons im Stream-Bereich. if not hosters: for anchor in soup.select("#grap-stream-list a.button.iconPlay[href], .streamLinksWrapper a.button.iconPlay[href]"): href = (anchor.get("href") or "").strip() play_url = _absolute_url(href).strip() if not play_url: continue text_name = self._normalize_hoster_name(anchor.get_text(" ", strip=True)) host_name = text_name if text_name and text_name.casefold() not in {"play", "details play"} else "Unbekannt" if host_name in hosters: host_name = f"{host_name} #{len(hosters) + 1}" hosters[host_name] = play_url return hosters def _hosters_for_detail_url(self, detail_url: str) -> Dict[str, str]: detail_url = (detail_url or "").strip() if not detail_url: return {} cached = self._hoster_cache.get(detail_url) if cached is not None: return dict(cached) if not self._requests_available: return {} try: soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return {} hosters = self._extract_hoster_links(soup) for url in hosters.values(): _log_url_event(url, kind="PARSE") self._hoster_cache[detail_url] = dict(hosters) return dict(hosters) def seasons_for(self, title: str) -> List[str]: title = (title or "").strip() if not title: return [] series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title) if series_key: seasons = sorted(self._series_entries.get(series_key, {}).keys()) return [f"Staffel {number}" for number in seasons] detail_url = self._ensure_title_url(title) return ["Film"] if detail_url else [] def episodes_for(self, title: str, season: str) -> List[str]: title = (title or "").strip() series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title) if series_key: season_number = _extract_number(season) if season_number is None: return [] episodes = self._series_entries.get(series_key, {}).get(season_number, {}) labels: List[str] = [] for episode_number in sorted(episodes.keys()): entry = episodes[episode_number] label = f"Episode {episode_number}" if entry.suffix: label = f"{label} - {entry.suffix}" labels.append(label) return labels return ["Stream"] if self._ensure_title_url(title) else [] def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]: detail_url = self._detail_url_for_selection(title, season, episode) return self.available_hosters_for_url(detail_url) def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: detail_url = self._detail_url_for_selection(title, season, episode) return self.stream_link_for_url(detail_url) def episode_url_for(self, title: str, season: str, episode: str) -> str: detail_url = self._detail_url_for_selection(title, season, episode) return (detail_url or "").strip() def available_hosters_for_url(self, episode_url: str) -> List[str]: detail_url = (episode_url or "").strip() hosters = self._hosters_for_detail_url(detail_url) return list(hosters.keys()) def stream_link_for_url(self, episode_url: str) -> Optional[str]: detail_url = (episode_url or "").strip() if not detail_url: return None hosters = self._hosters_for_detail_url(detail_url) if hosters: for preferred in self._preferred_hosters: preferred_key = (preferred or "").strip().casefold() if not preferred_key: continue for host_name, host_url in hosters.items(): if preferred_key in host_name.casefold() or preferred_key in host_url.casefold(): _log_url_event(host_url, kind="FOUND") return host_url first = next(iter(hosters.values())) _log_url_event(first, kind="FOUND") return first if not self._requests_available: return detail_url try: soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return detail_url candidates: List[str] = [] for iframe in soup.select("iframe[src]"): src = (iframe.get("src") or "").strip() if src: candidates.append(_absolute_url(src)) for anchor in soup.select("a[href]"): href = (anchor.get("href") or "").strip() if not href: continue lower = href.casefold() if "watch" in lower or "stream" in lower or "player" in lower: candidates.append(_absolute_url(href)) deduped: List[str] = [] seen: set[str] = set() for candidate in candidates: key = candidate.casefold() if key in seen: continue seen.add(key) deduped.append(candidate) if deduped: _log_url_event(deduped[0], kind="FOUND") return deduped[0] return detail_url def set_preferred_hosters(self, hosters: List[str]) -> None: normalized = [str(hoster).strip().lower() for hoster in hosters if str(hoster).strip()] if normalized: self._preferred_hosters = normalized def reset_preferred_hosters(self) -> None: self._preferred_hosters = list(self._default_preferred_hosters) def popular_series(self) -> List[str]: """Liefert beliebte Titel von /movies/top.""" if not self._requests_available: return [] try: url = _absolute_url("/movies/top") soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS)) hits = self._parse_listing_hits(soup) return self._apply_hits_to_title_index(hits) except Exception: return [] def new_titles_page(self, page: int = 1) -> List[str]: """Liefert neu hinzugefuegte Titel von /movies/new.""" if not self._requests_available: return [] page = max(1, int(page or 1)) try: base = _absolute_url("/movies/new") url = base if page == 1 else urljoin(base.rstrip("/") + "/", f"page/{page}") soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS)) hits = self._parse_listing_hits(soup) return self._apply_hits_to_title_index(hits) except Exception: return [] def new_titles(self) -> List[str]: return self.new_titles_page(1) def resolve_stream_link(self, link: str) -> Optional[str]: if not link: return None try: from resolveurl_backend import resolve as resolve_with_resolveurl except Exception: resolve_with_resolveurl = None # 1) Immer zuerst den ursprünglichen Hoster-Link an ResolveURL geben. if callable(resolve_with_resolveurl): resolved_by_resolveurl = resolve_with_resolveurl(link) if resolved_by_resolveurl: _log_url_event("ResolveURL", kind="HOSTER_RESOLVER") _log_url_event(resolved_by_resolveurl, kind="MEDIA") return resolved_by_resolveurl redirected = link if self._requests_available: response = None try: session = get_requests_session("filmpalast", headers=HEADERS) response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True) response.raise_for_status() redirected = (response.url or link).strip() or link except Exception: redirected = link finally: if response is not None: try: response.close() except Exception: pass # 2) Danach optional die Redirect-URL nochmals auflösen. if callable(resolve_with_resolveurl) and redirected and redirected != link: resolved_by_resolveurl = resolve_with_resolveurl(redirected) if resolved_by_resolveurl: _log_url_event("ResolveURL", kind="HOSTER_RESOLVER") _log_url_event(resolved_by_resolveurl, kind="MEDIA") return resolved_by_resolveurl # 3) Fallback bleibt wie bisher: direkte URL zurückgeben. if redirected: _log_url_event(redirected, kind="FINAL") return redirected return None # Alias für die automatische Plugin-Erkennung. Plugin = FilmpalastPlugin