"""Filmpalast Integration (movie-style provider). Hinweis: - Der Parser ist bewusst defensiv und arbeitet mit mehreren Fallback-Selektoren, da Filmpalast-Layouts je Domain variieren koennen. """ from __future__ import annotations from dataclasses import dataclass import re from urllib.parse import quote, urlencode from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias try: # pragma: no cover - optional dependency import requests from bs4 import BeautifulSoup # type: ignore[import-not-found] except ImportError as exc: # pragma: no cover - optional dependency requests = None BeautifulSoup = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None from plugin_interface import BasisPlugin from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url from http_session_pool import get_requests_session if TYPE_CHECKING: # pragma: no cover from requests import Session as RequestsSession from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found] else: # pragma: no cover RequestsSession: TypeAlias = Any BeautifulSoupT: TypeAlias = Any ADDON_ID = "plugin.video.viewit" SETTING_BASE_URL = "filmpalast_base_url" DEFAULT_BASE_URL = "https://filmpalast.to" DEFAULT_TIMEOUT = 20 DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"] SERIES_HINT_PREFIX = "series://filmpalast/" SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE) GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info" GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors" SETTING_LOG_URLS = "log_urls_filmpalast" SETTING_DUMP_HTML = "dump_html_filmpalast" SETTING_SHOW_URL_INFO = "show_url_info_filmpalast" SETTING_LOG_ERRORS = "log_errors_filmpalast" HEADERS = { "User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Connection": "keep-alive", } @dataclass(frozen=True) class SearchHit: title: str url: str @dataclass(frozen=True) class EpisodeEntry: season: int episode: int suffix: str url: str def _get_base_url() -> str: base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip() if not base: base = DEFAULT_BASE_URL return base.rstrip("/") def _absolute_url(url: str) -> str: url = (url or "").strip() if not url: return "" if url.startswith("http://") or url.startswith("https://"): return url if url.startswith("//"): return f"https:{url}" if url.startswith("/"): return f"{_get_base_url()}{url}" return f"{_get_base_url()}/{url.lstrip('/')}" def _normalize_search_text(value: str) -> str: value = (value or "").casefold() value = re.sub(r"[^a-z0-9]+", " ", value) value = re.sub(r"\s+", " ", value).strip() return value def _matches_query(query: str, *, title: str) -> bool: normalized_query = _normalize_search_text(query) if not normalized_query: return False haystack = f" {_normalize_search_text(title)} " return f" {normalized_query} " in haystack def _is_probably_content_url(url: str) -> bool: lower = (url or "").casefold() if not lower: return False block_markers = ( "/genre/", "/kategorie/", "/category/", "/tag/", "/login", "/register", "/kontakt", "/impressum", "/datenschutz", "/dmca", "/agb", "javascript:", "#", ) if any(marker in lower for marker in block_markers): return False allow_markers = ("/stream/", "/film/", "/movie/", "/serien/", "/serie/", "/title/") return any(marker in lower for marker in allow_markers) def _log_url_event(url: str, *, kind: str = "VISIT") -> None: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="filmpalast_urls.log", url=url, kind=kind, ) def _log_visit(url: str) -> None: _log_url_event(url, kind="VISIT") notify_url( ADDON_ID, heading="Filmpalast", url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO, plugin_setting_id=SETTING_SHOW_URL_INFO, ) def _log_response_html(url: str, body: str) -> None: dump_response_html( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_DUMP_HTML, plugin_setting_id=SETTING_DUMP_HTML, url=url, body=body, filename_prefix="filmpalast_response", ) def _log_error_message(message: str) -> None: log_error( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS, plugin_setting_id=SETTING_LOG_ERRORS, log_filename="filmpalast_errors.log", message=message, ) def _is_series_hint_url(value: str) -> bool: return (value or "").startswith(SERIES_HINT_PREFIX) def _series_hint_value(title: str) -> str: safe_title = quote((title or "").strip(), safe="") return f"{SERIES_HINT_PREFIX}{safe_title}" if safe_title else SERIES_HINT_PREFIX def _extract_number(value: str) -> Optional[int]: match = re.search(r"(\d+)", value or "") if not match: return None try: return int(match.group(1)) except Exception: return None def _strip_series_alias(title: str) -> str: return re.sub(r"\s*\(serie\)\s*$", "", title or "", flags=re.IGNORECASE).strip() def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT: if requests is None or BeautifulSoup is None: raise RuntimeError("requests/bs4 sind nicht verfuegbar.") _log_visit(url) sess = session or get_requests_session("filmpalast", headers=HEADERS) try: response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() except Exception as exc: _log_error_message(f"GET {url} failed: {exc}") raise if response.url and response.url != url: _log_url_event(response.url, kind="REDIRECT") _log_response_html(url, response.text) return BeautifulSoup(response.text, "html.parser") class FilmpalastPlugin(BasisPlugin): name = "Filmpalast" def __init__(self) -> None: self._title_to_url: Dict[str, str] = {} self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {} self._hoster_cache: Dict[str, Dict[str, str]] = {} self._requests_available = REQUESTS_AVAILABLE self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._preferred_hosters: List[str] = list(self._default_preferred_hosters) self.is_available = True self.unavailable_reason: Optional[str] = None if not self._requests_available: # pragma: no cover - optional dependency self.is_available = False self.unavailable_reason = ( "requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'." ) if REQUESTS_IMPORT_ERROR: print(f"FilmpalastPlugin Importfehler: {REQUESTS_IMPORT_ERROR}") def _lookup_title_url(self, title: str) -> str: title = (title or "").strip() if not title: return "" direct = self._title_to_url.get(title) if direct: return direct wanted = title.casefold() for key, value in self._title_to_url.items(): if key.casefold() == wanted and value: return value return "" def _series_key_for_title(self, title: str) -> str: title = (title or "").strip() if not title: return "" if title in self._series_entries: return title wanted = title.casefold() for key in self._series_entries.keys(): if key.casefold() == wanted: return key return "" def _has_series_entries(self, title: str) -> bool: return bool(self._series_key_for_title(title)) def _episode_entry_from_hit(self, hit: SearchHit) -> Optional[Tuple[str, EpisodeEntry]]: title = (hit.title or "").strip() if not title: return None marker = SEASON_EPISODE_RE.search(title) if not marker: return None try: season_number = int(marker.group(1)) episode_number = int(marker.group(2)) except Exception: return None series_title = re.sub(r"\s+", " ", title[: marker.start()] or "").strip(" -|:;,_") if not series_title: return None suffix = re.sub(r"\s+", " ", title[marker.end() :] or "").strip(" -|:;,_") entry = EpisodeEntry(season=season_number, episode=episode_number, suffix=suffix, url=hit.url) return (series_title, entry) def _add_series_entry(self, series_title: str, entry: EpisodeEntry) -> None: if not series_title or not entry.url: return seasons = self._series_entries.setdefault(series_title, {}) episodes = seasons.setdefault(entry.season, {}) if entry.episode not in episodes: episodes[entry.episode] = entry def _ensure_series_entries_for_title(self, title: str) -> str: series_key = self._series_key_for_title(title) if series_key: return series_key original_title = (title or "").strip() lookup_title = _strip_series_alias(original_title) if not lookup_title: return "" if not self._requests_available: return "" wanted = _normalize_search_text(lookup_title) hits = self._search_hits(lookup_title) for hit in hits: parsed = self._episode_entry_from_hit(hit) if not parsed: continue series_title, entry = parsed if wanted and _normalize_search_text(series_title) != wanted: continue self._add_series_entry(series_title, entry) self._title_to_url.setdefault(series_title, _series_hint_value(series_title)) resolved = self._series_key_for_title(original_title) or self._series_key_for_title(lookup_title) if resolved and original_title and original_title != resolved: self._series_entries[original_title] = self._series_entries[resolved] self._title_to_url.setdefault(original_title, _series_hint_value(resolved)) return original_title return resolved def _detail_url_for_selection(self, title: str, season: str, episode: str) -> str: series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title) if series_key: season_number = _extract_number(season) episode_number = _extract_number(episode) if season_number is None or episode_number is None: return "" entry = self._series_entries.get(series_key, {}).get(season_number, {}).get(episode_number) return entry.url if entry else "" return self._ensure_title_url(title) def _search_hits(self, query: str) -> List[SearchHit]: query = (query or "").strip() if not query: return [] if not self._requests_available or requests is None: return [] session = get_requests_session("filmpalast", headers=HEADERS) search_requests = [(_absolute_url(f"/search/title/{quote(query)}"), None)] hits: List[SearchHit] = [] seen_titles: set[str] = set() seen_urls: set[str] = set() for base_url, params in search_requests: try: request_url = base_url if not params else f"{base_url}?{urlencode(params)}" _log_url_event(request_url, kind="GET") _log_visit(request_url) response = session.get(base_url, params=params, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() if response.url and response.url != request_url: _log_url_event(response.url, kind="REDIRECT") _log_response_html(request_url, response.text) soup = BeautifulSoup(response.text, "html.parser") except Exception as exc: _log_error_message(f"search request failed ({base_url}): {exc}") continue anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]") if not anchors: anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']") for anchor in anchors: href = (anchor.get("href") or "").strip() if not href: continue url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/") if not _is_probably_content_url(url): continue title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip() title = (title or "").strip() if not title: continue if title.casefold() in {"details/play", "play", "details"}: continue if not _matches_query(query, title=title): continue title_key = title.casefold() url_key = url.casefold() if title_key in seen_titles or url_key in seen_urls: continue seen_titles.add(title_key) seen_urls.add(url_key) _log_url_event(url, kind="PARSE") hits.append(SearchHit(title=title, url=url)) if hits: break return hits async def search_titles(self, query: str) -> List[str]: hits = self._search_hits(query) self._title_to_url = {} self._series_entries = {} self._hoster_cache.clear() movie_titles: List[str] = [] series_titles_seen: set[str] = set() for hit in hits: parsed = self._episode_entry_from_hit(hit) if parsed: series_title, entry = parsed self._add_series_entry(series_title, entry) if series_title.casefold() not in series_titles_seen: self._title_to_url[series_title] = _series_hint_value(series_title) series_titles_seen.add(series_title.casefold()) continue title = (hit.title or "").strip() if not title: continue movie_titles.append(title) self._title_to_url[title] = hit.url titles: List[str] = list(movie_titles) movie_keys = {entry.casefold() for entry in movie_titles} for series_title in sorted(self._series_entries.keys(), key=lambda value: value.casefold()): if series_title.casefold() in movie_keys: alias = f"{series_title} (Serie)" self._title_to_url[alias] = self._title_to_url.get(series_title, _series_hint_value(series_title)) self._series_entries[alias] = self._series_entries[series_title] titles.append(alias) else: titles.append(series_title) titles.sort(key=lambda value: value.casefold()) return titles def _ensure_title_url(self, title: str) -> str: title = (title or "").strip() if not title: return "" direct = self._lookup_title_url(title) if direct and _is_series_hint_url(direct): return "" if direct: self._title_to_url[title] = direct return direct if self._has_series_entries(title) or self._ensure_series_entries_for_title(title): self._title_to_url[title] = _series_hint_value(title) return "" wanted = title.casefold() hits = self._search_hits(title) for hit in hits: if self._episode_entry_from_hit(hit): continue if hit.title.casefold() == wanted and hit.url: self._title_to_url[title] = hit.url return hit.url return "" def remember_series_url(self, title: str, series_url: str) -> None: title = (title or "").strip() series_url = (series_url or "").strip() if not title or not series_url: return self._title_to_url[title] = series_url self._hoster_cache.clear() def series_url_for_title(self, title: str) -> str: title = (title or "").strip() if not title: return "" direct = self._lookup_title_url(title) if direct: return direct series_key = self._series_key_for_title(title) if series_key: return _series_hint_value(series_key) return "" def is_movie(self, title: str) -> bool: title = (title or "").strip() if not title: return False direct = self._lookup_title_url(title) if direct: return not _is_series_hint_url(direct) if SEASON_EPISODE_RE.search(title): return False if self._has_series_entries(title): return False if self._ensure_series_entries_for_title(title): return False return True @staticmethod def _normalize_hoster_name(name: str) -> str: name = (name or "").strip() if not name: return "" name = re.sub(r"\s+", " ", name) return name def _extract_hoster_links(self, soup: BeautifulSoupT) -> Dict[str, str]: hosters: Dict[str, str] = {} if not soup: return hosters # Primäres Layout: jeder Hoster in eigener UL mit hostName + Play-Link. for block in soup.select("ul.currentStreamLinks"): host_name_node = block.select_one("li.hostBg .hostName") host_name = self._normalize_hoster_name(host_name_node.get_text(" ", strip=True) if host_name_node else "") play_anchor = block.select_one("li.streamPlayBtn a[href], a.button.iconPlay[href]") href = (play_anchor.get("href") if play_anchor else "") or "" play_url = _absolute_url(href).strip() if not play_url: continue if not host_name: host_name = self._normalize_hoster_name(play_anchor.get_text(" ", strip=True) if play_anchor else "") if not host_name: host_name = "Unbekannt" if host_name not in hosters: hosters[host_name] = play_url # Fallback: direkte Play-Buttons im Stream-Bereich. if not hosters: for anchor in soup.select("#grap-stream-list a.button.iconPlay[href], .streamLinksWrapper a.button.iconPlay[href]"): href = (anchor.get("href") or "").strip() play_url = _absolute_url(href).strip() if not play_url: continue text_name = self._normalize_hoster_name(anchor.get_text(" ", strip=True)) host_name = text_name if text_name and text_name.casefold() not in {"play", "details play"} else "Unbekannt" if host_name in hosters: host_name = f"{host_name} #{len(hosters) + 1}" hosters[host_name] = play_url return hosters def _hosters_for_detail_url(self, detail_url: str) -> Dict[str, str]: detail_url = (detail_url or "").strip() if not detail_url: return {} cached = self._hoster_cache.get(detail_url) if cached is not None: return dict(cached) if not self._requests_available: return {} try: soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return {} hosters = self._extract_hoster_links(soup) for url in hosters.values(): _log_url_event(url, kind="PARSE") self._hoster_cache[detail_url] = dict(hosters) return dict(hosters) def seasons_for(self, title: str) -> List[str]: title = (title or "").strip() if not title: return [] series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title) if series_key: seasons = sorted(self._series_entries.get(series_key, {}).keys()) return [f"Staffel {number}" for number in seasons] detail_url = self._ensure_title_url(title) return ["Film"] if detail_url else [] def episodes_for(self, title: str, season: str) -> List[str]: title = (title or "").strip() series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title) if series_key: season_number = _extract_number(season) if season_number is None: return [] episodes = self._series_entries.get(series_key, {}).get(season_number, {}) labels: List[str] = [] for episode_number in sorted(episodes.keys()): entry = episodes[episode_number] label = f"Episode {episode_number}" if entry.suffix: label = f"{label} - {entry.suffix}" labels.append(label) return labels return ["Stream"] if self._ensure_title_url(title) else [] def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]: detail_url = self._detail_url_for_selection(title, season, episode) hosters = self._hosters_for_detail_url(detail_url) return list(hosters.keys()) def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: detail_url = self._detail_url_for_selection(title, season, episode) if not detail_url: return None hosters = self._hosters_for_detail_url(detail_url) if hosters: for preferred in self._preferred_hosters: preferred_key = (preferred or "").strip().casefold() if not preferred_key: continue for host_name, host_url in hosters.items(): if preferred_key in host_name.casefold() or preferred_key in host_url.casefold(): _log_url_event(host_url, kind="FOUND") return host_url first = next(iter(hosters.values())) _log_url_event(first, kind="FOUND") return first if not self._requests_available: return detail_url try: soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS)) except Exception: return detail_url candidates: List[str] = [] for iframe in soup.select("iframe[src]"): src = (iframe.get("src") or "").strip() if src: candidates.append(_absolute_url(src)) for anchor in soup.select("a[href]"): href = (anchor.get("href") or "").strip() if not href: continue lower = href.casefold() if "watch" in lower or "stream" in lower or "player" in lower: candidates.append(_absolute_url(href)) deduped: List[str] = [] seen: set[str] = set() for candidate in candidates: key = candidate.casefold() if key in seen: continue seen.add(key) deduped.append(candidate) if deduped: _log_url_event(deduped[0], kind="FOUND") return deduped[0] return detail_url def set_preferred_hosters(self, hosters: List[str]) -> None: normalized = [str(hoster).strip().lower() for hoster in hosters if str(hoster).strip()] if normalized: self._preferred_hosters = normalized def reset_preferred_hosters(self) -> None: self._preferred_hosters = list(self._default_preferred_hosters) def resolve_stream_link(self, link: str) -> Optional[str]: if not link: return None resolved = link if self._requests_available: try: session = get_requests_session("filmpalast", headers=HEADERS) response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True) response.raise_for_status() resolved = (response.url or link).strip() or link except Exception: resolved = link try: from resolveurl_backend import resolve as resolve_with_resolveurl except Exception: resolve_with_resolveurl = None if callable(resolve_with_resolveurl): resolved_by_resolveurl = resolve_with_resolveurl(resolved) if resolved_by_resolveurl: _log_url_event("ResolveURL", kind="HOSTER_RESOLVER") _log_url_event(resolved_by_resolveurl, kind="MEDIA") return resolved_by_resolveurl if resolved: _log_url_event(resolved, kind="FINAL") return resolved return None