1084 lines
42 KiB
Python
1084 lines
42 KiB
Python
"""Filmpalast Integration (movie-style provider).
|
|
|
|
Hinweis:
|
|
- Der Parser ist bewusst defensiv und arbeitet mit mehreren Fallback-Selektoren,
|
|
da Filmpalast-Layouts je Domain variieren koennen.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
import re
|
|
from urllib.parse import quote, urlencode
|
|
from urllib.parse import urljoin
|
|
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
|
|
|
|
try: # pragma: no cover - optional dependency
|
|
import requests
|
|
from bs4 import BeautifulSoup # type: ignore[import-not-found]
|
|
except ImportError as exc: # pragma: no cover - optional dependency
|
|
requests = None
|
|
BeautifulSoup = None
|
|
REQUESTS_AVAILABLE = False
|
|
REQUESTS_IMPORT_ERROR = exc
|
|
else:
|
|
REQUESTS_AVAILABLE = True
|
|
REQUESTS_IMPORT_ERROR = None
|
|
|
|
from plugin_interface import BasisPlugin
|
|
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
|
|
from http_session_pool import get_requests_session
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from requests import Session as RequestsSession
|
|
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
|
|
else: # pragma: no cover
|
|
RequestsSession = Any
|
|
BeautifulSoupT = Any
|
|
|
|
|
|
ADDON_ID = "plugin.video.viewit"
|
|
SETTING_BASE_URL = "filmpalast_base_url"
|
|
DEFAULT_BASE_URL = "https://filmpalast.to"
|
|
DEFAULT_TIMEOUT = 20
|
|
DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"]
|
|
SERIES_HINT_PREFIX = "series://filmpalast/"
|
|
SERIES_VIEW_PATH = "/serien/view"
|
|
SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE)
|
|
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
|
|
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
|
|
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
|
|
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
|
|
SETTING_LOG_URLS = "log_urls_filmpalast"
|
|
SETTING_DUMP_HTML = "dump_html_filmpalast"
|
|
SETTING_SHOW_URL_INFO = "show_url_info_filmpalast"
|
|
SETTING_LOG_ERRORS = "log_errors_filmpalast"
|
|
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
|
|
|
|
|
|
def _emit_progress(callback: ProgressCallback, message: str, percent: Optional[int] = None) -> None:
|
|
if not callable(callback):
|
|
return
|
|
try:
|
|
callback(str(message or ""), None if percent is None else int(percent))
|
|
except Exception:
|
|
return
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
|
"Connection": "keep-alive",
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SearchHit:
|
|
title: str
|
|
url: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EpisodeEntry:
|
|
season: int
|
|
episode: int
|
|
suffix: str
|
|
url: str
|
|
|
|
|
|
def _get_base_url() -> str:
|
|
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
|
|
if not base:
|
|
base = DEFAULT_BASE_URL
|
|
return base.rstrip("/")
|
|
|
|
|
|
def _absolute_url(url: str) -> str:
|
|
url = (url or "").strip()
|
|
if not url:
|
|
return ""
|
|
if url.startswith("http://") or url.startswith("https://"):
|
|
return url
|
|
if url.startswith("//"):
|
|
return f"https:{url}"
|
|
if url.startswith("/"):
|
|
return f"{_get_base_url()}{url}"
|
|
return f"{_get_base_url()}/{url.lstrip('/')}"
|
|
|
|
|
|
def _normalize_search_text(value: str) -> str:
|
|
value = (value or "").casefold()
|
|
value = re.sub(r"[^a-z0-9]+", " ", value)
|
|
value = re.sub(r"\s+", " ", value).strip()
|
|
return value
|
|
|
|
|
|
def _matches_query(query: str, *, title: str) -> bool:
|
|
normalized_query = _normalize_search_text(query)
|
|
if not normalized_query:
|
|
return False
|
|
haystack = f" {_normalize_search_text(title)} "
|
|
return f" {normalized_query} " in haystack
|
|
|
|
|
|
def _is_probably_content_url(url: str) -> bool:
|
|
lower = (url or "").casefold()
|
|
if not lower:
|
|
return False
|
|
block_markers = (
|
|
"/genre/",
|
|
"/kategorie/",
|
|
"/category/",
|
|
"/tag/",
|
|
"/login",
|
|
"/register",
|
|
"/kontakt",
|
|
"/impressum",
|
|
"/datenschutz",
|
|
"/dmca",
|
|
"/agb",
|
|
"javascript:",
|
|
"#",
|
|
)
|
|
if any(marker in lower for marker in block_markers):
|
|
return False
|
|
allow_markers = ("/stream/", "/film/", "/movie/", "/serien/", "/serie/", "/title/")
|
|
return any(marker in lower for marker in allow_markers)
|
|
|
|
|
|
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
|
|
log_url(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
|
|
plugin_setting_id=SETTING_LOG_URLS,
|
|
log_filename="filmpalast_urls.log",
|
|
url=url,
|
|
kind=kind,
|
|
)
|
|
|
|
|
|
def _log_visit(url: str) -> None:
|
|
_log_url_event(url, kind="VISIT")
|
|
notify_url(
|
|
ADDON_ID,
|
|
heading="Filmpalast",
|
|
url=url,
|
|
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
|
|
plugin_setting_id=SETTING_SHOW_URL_INFO,
|
|
)
|
|
|
|
|
|
def _log_response_html(url: str, body: str) -> None:
|
|
dump_response_html(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
|
|
plugin_setting_id=SETTING_DUMP_HTML,
|
|
url=url,
|
|
body=body,
|
|
filename_prefix="filmpalast_response",
|
|
)
|
|
|
|
|
|
def _log_error_message(message: str) -> None:
|
|
log_error(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
|
|
plugin_setting_id=SETTING_LOG_ERRORS,
|
|
log_filename="filmpalast_errors.log",
|
|
message=message,
|
|
)
|
|
|
|
|
|
def _is_series_hint_url(value: str) -> bool:
|
|
return (value or "").startswith(SERIES_HINT_PREFIX)
|
|
|
|
|
|
def _series_hint_value(title: str) -> str:
|
|
safe_title = quote((title or "").strip(), safe="")
|
|
return f"{SERIES_HINT_PREFIX}{safe_title}" if safe_title else SERIES_HINT_PREFIX
|
|
|
|
|
|
def _extract_number(value: str) -> Optional[int]:
|
|
match = re.search(r"(\d+)", value or "")
|
|
if not match:
|
|
return None
|
|
try:
|
|
return int(match.group(1))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _strip_series_alias(title: str) -> str:
|
|
return re.sub(r"\s*\(serie\)\s*$", "", title or "", flags=re.IGNORECASE).strip()
|
|
|
|
|
|
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
|
|
if requests is None or BeautifulSoup is None:
|
|
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
|
|
_log_visit(url)
|
|
sess = session or get_requests_session("filmpalast", headers=HEADERS)
|
|
response = None
|
|
try:
|
|
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
|
|
response.raise_for_status()
|
|
except Exception as exc:
|
|
_log_error_message(f"GET {url} failed: {exc}")
|
|
raise
|
|
try:
|
|
final_url = (response.url or url) if response is not None else url
|
|
body = (response.text or "") if response is not None else ""
|
|
if final_url != url:
|
|
_log_url_event(final_url, kind="REDIRECT")
|
|
_log_response_html(url, body)
|
|
return BeautifulSoup(body, "html.parser")
|
|
finally:
|
|
if response is not None:
|
|
try:
|
|
response.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class FilmpalastPlugin(BasisPlugin):
|
|
name = "Filmpalast"
|
|
version = "1.0.0"
|
|
|
|
def __init__(self) -> None:
|
|
self._title_to_url: Dict[str, str] = {}
|
|
self._title_meta: Dict[str, tuple[str, str]] = {}
|
|
self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {}
|
|
self._hoster_cache: Dict[str, Dict[str, str]] = {}
|
|
self._genre_to_url: Dict[str, str] = {}
|
|
self._genre_page_count_cache: Dict[str, int] = {}
|
|
self._alpha_to_url: Dict[str, str] = {}
|
|
self._alpha_page_count_cache: Dict[str, int] = {}
|
|
self._series_page_count_cache: Dict[int, int] = {}
|
|
self._requests_available = REQUESTS_AVAILABLE
|
|
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
|
|
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
|
|
self.is_available = True
|
|
self.unavailable_reason: Optional[str] = None
|
|
if not self._requests_available: # pragma: no cover - optional dependency
|
|
self.is_available = False
|
|
self.unavailable_reason = (
|
|
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
|
|
)
|
|
if REQUESTS_IMPORT_ERROR:
|
|
print(f"FilmpalastPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
|
|
|
|
def _lookup_title_url(self, title: str) -> str:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return ""
|
|
direct = self._title_to_url.get(title)
|
|
if direct:
|
|
return direct
|
|
wanted = title.casefold()
|
|
for key, value in self._title_to_url.items():
|
|
if key.casefold() == wanted and value:
|
|
return value
|
|
return ""
|
|
|
|
def _series_key_for_title(self, title: str) -> str:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return ""
|
|
if title in self._series_entries:
|
|
return title
|
|
wanted = title.casefold()
|
|
for key in self._series_entries.keys():
|
|
if key.casefold() == wanted:
|
|
return key
|
|
return ""
|
|
|
|
def _has_series_entries(self, title: str) -> bool:
|
|
return bool(self._series_key_for_title(title))
|
|
|
|
def _episode_entry_from_hit(self, hit: SearchHit) -> Optional[Tuple[str, EpisodeEntry]]:
|
|
title = (hit.title or "").strip()
|
|
if not title:
|
|
return None
|
|
marker = SEASON_EPISODE_RE.search(title)
|
|
if not marker:
|
|
return None
|
|
try:
|
|
season_number = int(marker.group(1))
|
|
episode_number = int(marker.group(2))
|
|
except Exception:
|
|
return None
|
|
series_title = re.sub(r"\s+", " ", title[: marker.start()] or "").strip(" -|:;,_")
|
|
if not series_title:
|
|
return None
|
|
suffix = re.sub(r"\s+", " ", title[marker.end() :] or "").strip(" -|:;,_")
|
|
entry = EpisodeEntry(season=season_number, episode=episode_number, suffix=suffix, url=hit.url)
|
|
return (series_title, entry)
|
|
|
|
def _add_series_entry(self, series_title: str, entry: EpisodeEntry) -> None:
|
|
if not series_title or not entry.url:
|
|
return
|
|
seasons = self._series_entries.setdefault(series_title, {})
|
|
episodes = seasons.setdefault(entry.season, {})
|
|
if entry.episode not in episodes:
|
|
episodes[entry.episode] = entry
|
|
|
|
def _ensure_series_entries_for_title(self, title: str) -> str:
|
|
series_key = self._series_key_for_title(title)
|
|
if series_key:
|
|
return series_key
|
|
original_title = (title or "").strip()
|
|
lookup_title = _strip_series_alias(original_title)
|
|
if not lookup_title:
|
|
return ""
|
|
if not self._requests_available:
|
|
return ""
|
|
wanted = _normalize_search_text(lookup_title)
|
|
hits = self._search_hits(lookup_title)
|
|
for hit in hits:
|
|
parsed = self._episode_entry_from_hit(hit)
|
|
if not parsed:
|
|
continue
|
|
series_title, entry = parsed
|
|
if wanted and _normalize_search_text(series_title) != wanted:
|
|
continue
|
|
self._add_series_entry(series_title, entry)
|
|
self._title_to_url.setdefault(series_title, _series_hint_value(series_title))
|
|
resolved = self._series_key_for_title(original_title) or self._series_key_for_title(lookup_title)
|
|
if resolved and original_title and original_title != resolved:
|
|
self._series_entries[original_title] = self._series_entries[resolved]
|
|
self._title_to_url.setdefault(original_title, _series_hint_value(resolved))
|
|
return original_title
|
|
return resolved
|
|
|
|
def _detail_url_for_selection(self, title: str, season: str, episode: str) -> str:
|
|
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
|
|
if series_key:
|
|
season_number = _extract_number(season)
|
|
episode_number = _extract_number(episode)
|
|
if season_number is None or episode_number is None:
|
|
return ""
|
|
entry = self._series_entries.get(series_key, {}).get(season_number, {}).get(episode_number)
|
|
return entry.url if entry else ""
|
|
return self._ensure_title_url(title)
|
|
|
|
def _search_hits(self, query: str) -> List[SearchHit]:
|
|
query = (query or "").strip()
|
|
if not query:
|
|
return []
|
|
if not self._requests_available or requests is None:
|
|
return []
|
|
|
|
session = get_requests_session("filmpalast", headers=HEADERS)
|
|
search_requests = [(_absolute_url(f"/search/title/{quote(query)}"), None)]
|
|
|
|
hits: List[SearchHit] = []
|
|
seen_titles: set[str] = set()
|
|
seen_urls: set[str] = set()
|
|
for base_url, params in search_requests:
|
|
response = None
|
|
try:
|
|
request_url = base_url if not params else f"{base_url}?{urlencode(params)}"
|
|
_log_url_event(request_url, kind="GET")
|
|
_log_visit(request_url)
|
|
response = session.get(base_url, params=params, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
|
|
response.raise_for_status()
|
|
if response.url and response.url != request_url:
|
|
_log_url_event(response.url, kind="REDIRECT")
|
|
_log_response_html(request_url, response.text)
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
except Exception as exc:
|
|
_log_error_message(f"search request failed ({base_url}): {exc}")
|
|
continue
|
|
finally:
|
|
if response is not None:
|
|
try:
|
|
response.close()
|
|
except Exception:
|
|
pass
|
|
|
|
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
|
|
if not anchors:
|
|
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
|
|
for anchor in anchors:
|
|
href = (anchor.get("href") or "").strip()
|
|
if not href:
|
|
continue
|
|
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
|
|
if not _is_probably_content_url(url):
|
|
continue
|
|
|
|
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
|
|
title = (title or "").strip()
|
|
if not title:
|
|
continue
|
|
if title.casefold() in {"details/play", "play", "details"}:
|
|
continue
|
|
if not _matches_query(query, title=title):
|
|
continue
|
|
title_key = title.casefold()
|
|
url_key = url.casefold()
|
|
if title_key in seen_titles or url_key in seen_urls:
|
|
continue
|
|
seen_titles.add(title_key)
|
|
seen_urls.add(url_key)
|
|
_log_url_event(url, kind="PARSE")
|
|
hits.append(SearchHit(title=title, url=url))
|
|
|
|
if hits:
|
|
break
|
|
|
|
return hits
|
|
|
|
def _parse_listing_hits(self, soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
|
|
hits: List[SearchHit] = []
|
|
if not soup:
|
|
return hits
|
|
seen_titles: set[str] = set()
|
|
seen_urls: set[str] = set()
|
|
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
|
|
if not anchors:
|
|
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
|
|
for anchor in anchors:
|
|
href = (anchor.get("href") or "").strip()
|
|
if not href:
|
|
continue
|
|
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
|
|
if not _is_probably_content_url(url):
|
|
continue
|
|
|
|
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
|
|
if not title:
|
|
continue
|
|
if title.casefold() in {"details/play", "play", "details"}:
|
|
continue
|
|
if query and not _matches_query(query, title=title):
|
|
continue
|
|
title_key = title.casefold()
|
|
url_key = url.casefold()
|
|
if title_key in seen_titles or url_key in seen_urls:
|
|
continue
|
|
seen_titles.add(title_key)
|
|
seen_urls.add(url_key)
|
|
_log_url_event(url, kind="PARSE")
|
|
hits.append(SearchHit(title=title, url=url))
|
|
return hits
|
|
|
|
def _apply_hits_to_title_index(self, hits: List[SearchHit]) -> List[str]:
|
|
self._title_to_url = {}
|
|
self._series_entries = {}
|
|
self._hoster_cache.clear()
|
|
movie_titles: List[str] = []
|
|
series_titles_seen: set[str] = set()
|
|
for hit in hits:
|
|
parsed = self._episode_entry_from_hit(hit)
|
|
if parsed:
|
|
series_title, entry = parsed
|
|
self._add_series_entry(series_title, entry)
|
|
if series_title.casefold() not in series_titles_seen:
|
|
self._title_to_url[series_title] = _series_hint_value(series_title)
|
|
series_titles_seen.add(series_title.casefold())
|
|
continue
|
|
title = (hit.title or "").strip()
|
|
if not title:
|
|
continue
|
|
movie_titles.append(title)
|
|
self._title_to_url[title] = hit.url
|
|
titles: List[str] = list(movie_titles)
|
|
movie_keys = {entry.casefold() for entry in movie_titles}
|
|
for series_title in sorted(self._series_entries.keys(), key=lambda value: value.casefold()):
|
|
if series_title.casefold() in movie_keys:
|
|
alias = f"{series_title} (Serie)"
|
|
self._title_to_url[alias] = self._title_to_url.get(series_title, _series_hint_value(series_title))
|
|
self._series_entries[alias] = self._series_entries[series_title]
|
|
titles.append(alias)
|
|
else:
|
|
titles.append(series_title)
|
|
titles.sort(key=lambda value: value.casefold())
|
|
return titles
|
|
|
|
async def search_titles(self, query: str, progress_callback: ProgressCallback = None) -> List[str]:
|
|
_emit_progress(progress_callback, "Filmpalast Suche", 15)
|
|
hits = self._search_hits(query)
|
|
_emit_progress(progress_callback, f"Treffer verarbeiten ({len(hits)})", 70)
|
|
titles = self._apply_hits_to_title_index(hits)
|
|
_emit_progress(progress_callback, f"Fertig: {len(titles)} Treffer", 95)
|
|
return titles
|
|
|
|
def _parse_genres(self, soup: BeautifulSoupT) -> Dict[str, str]:
|
|
genres: Dict[str, str] = {}
|
|
if not soup:
|
|
return genres
|
|
for anchor in soup.select("section#genre a[href], #genre a[href], aside #genre a[href]"):
|
|
name = (anchor.get_text(" ", strip=True) or "").strip()
|
|
href = (anchor.get("href") or "").strip()
|
|
if not name or not href:
|
|
continue
|
|
if "/search/genre/" not in href:
|
|
continue
|
|
genres[name] = _absolute_url(href)
|
|
return genres
|
|
|
|
def _extract_last_page(self, soup: BeautifulSoupT) -> int:
|
|
max_page = 1
|
|
if not soup:
|
|
return max_page
|
|
for anchor in soup.select("#paging a[href], .paging a[href], a.pageing[href]"):
|
|
text = (anchor.get_text(" ", strip=True) or "").strip()
|
|
for candidate in (text, (anchor.get("href") or "").strip()):
|
|
for value in re.findall(r"(\d+)", candidate):
|
|
try:
|
|
max_page = max(max_page, int(value))
|
|
except Exception:
|
|
continue
|
|
return max_page
|
|
|
|
def capabilities(self) -> set[str]:
|
|
return {"genres", "alpha", "series_catalog"}
|
|
|
|
def _parse_alpha_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
|
|
alpha: Dict[str, str] = {}
|
|
if not soup:
|
|
return alpha
|
|
for anchor in soup.select("section#movietitle a[href], #movietitle a[href], aside #movietitle a[href]"):
|
|
name = (anchor.get_text(" ", strip=True) or "").strip()
|
|
href = (anchor.get("href") or "").strip()
|
|
if not name or not href:
|
|
continue
|
|
if "/search/alpha/" not in href:
|
|
continue
|
|
if name in alpha:
|
|
continue
|
|
alpha[name] = _absolute_url(href)
|
|
return alpha
|
|
|
|
def alpha_index(self) -> List[str]:
|
|
if not self._requests_available:
|
|
return []
|
|
if self._alpha_to_url:
|
|
return list(self._alpha_to_url.keys())
|
|
try:
|
|
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return []
|
|
parsed = self._parse_alpha_links(soup)
|
|
if parsed:
|
|
self._alpha_to_url = dict(parsed)
|
|
return list(self._alpha_to_url.keys())
|
|
|
|
def alpha_page_count(self, letter: str) -> int:
|
|
letter = (letter or "").strip()
|
|
if not letter:
|
|
return 1
|
|
if letter in self._alpha_page_count_cache:
|
|
return max(1, int(self._alpha_page_count_cache.get(letter, 1)))
|
|
if not self._alpha_to_url:
|
|
self.alpha_index()
|
|
base_url = self._alpha_to_url.get(letter, "")
|
|
if not base_url:
|
|
return 1
|
|
try:
|
|
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return 1
|
|
pages = self._extract_last_page(soup)
|
|
self._alpha_page_count_cache[letter] = max(1, pages)
|
|
return self._alpha_page_count_cache[letter]
|
|
|
|
def titles_for_alpha_page(self, letter: str, page: int) -> List[str]:
|
|
letter = (letter or "").strip()
|
|
if not letter or not self._requests_available:
|
|
return []
|
|
if not self._alpha_to_url:
|
|
self.alpha_index()
|
|
base_url = self._alpha_to_url.get(letter, "")
|
|
if not base_url:
|
|
return []
|
|
page = max(1, int(page or 1))
|
|
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
|
|
try:
|
|
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return []
|
|
hits = self._parse_listing_hits(soup)
|
|
return self._apply_hits_to_title_index(hits)
|
|
|
|
def titles_for_alpha(self, letter: str) -> List[str]:
|
|
titles = self.titles_for_alpha_page(letter, 1)
|
|
titles.sort(key=lambda value: value.casefold())
|
|
return titles
|
|
|
|
def _series_view_url(self) -> str:
|
|
return _absolute_url(SERIES_VIEW_PATH)
|
|
|
|
def series_catalog_page_count(self, page: int = 1) -> int:
|
|
if not self._requests_available:
|
|
return 1
|
|
cache_key = int(page or 1)
|
|
if cache_key in self._series_page_count_cache:
|
|
return max(1, int(self._series_page_count_cache.get(cache_key, 1)))
|
|
base_url = self._series_view_url()
|
|
if not base_url:
|
|
return 1
|
|
try:
|
|
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return 1
|
|
pages = self._extract_last_page(soup)
|
|
self._series_page_count_cache[cache_key] = max(1, pages)
|
|
return self._series_page_count_cache[cache_key]
|
|
|
|
def series_catalog_page(self, page: int) -> List[str]:
|
|
if not self._requests_available:
|
|
return []
|
|
base_url = self._series_view_url()
|
|
if not base_url:
|
|
return []
|
|
page = max(1, int(page or 1))
|
|
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
|
|
try:
|
|
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return []
|
|
hits = self._parse_listing_hits(soup)
|
|
return self._apply_hits_to_title_index(hits)
|
|
|
|
def series_catalog_has_more(self, page: int) -> bool:
|
|
total = self.series_catalog_page_count(page)
|
|
return page < total
|
|
|
|
def genres(self) -> List[str]:
|
|
if not self._requests_available:
|
|
return []
|
|
if self._genre_to_url:
|
|
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
|
|
try:
|
|
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return []
|
|
parsed = self._parse_genres(soup)
|
|
if parsed:
|
|
self._genre_to_url = dict(parsed)
|
|
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
|
|
|
|
def genre_page_count(self, genre: str) -> int:
|
|
genre = (genre or "").strip()
|
|
if not genre:
|
|
return 1
|
|
if genre in self._genre_page_count_cache:
|
|
return max(1, int(self._genre_page_count_cache.get(genre, 1)))
|
|
if not self._genre_to_url:
|
|
self.genres()
|
|
base_url = self._genre_to_url.get(genre, "")
|
|
if not base_url:
|
|
return 1
|
|
try:
|
|
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return 1
|
|
pages = self._extract_last_page(soup)
|
|
self._genre_page_count_cache[genre] = max(1, pages)
|
|
return self._genre_page_count_cache[genre]
|
|
|
|
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
|
|
genre = (genre or "").strip()
|
|
if not genre or not self._requests_available:
|
|
return []
|
|
if not self._genre_to_url:
|
|
self.genres()
|
|
base_url = self._genre_to_url.get(genre, "")
|
|
if not base_url:
|
|
return []
|
|
page = max(1, int(page or 1))
|
|
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
|
|
try:
|
|
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return []
|
|
hits = self._parse_listing_hits(soup)
|
|
return self._apply_hits_to_title_index(hits)
|
|
|
|
def titles_for_genre(self, genre: str) -> List[str]:
|
|
titles = self.titles_for_genre_page(genre, 1)
|
|
titles.sort(key=lambda value: value.casefold())
|
|
return titles
|
|
|
|
def _ensure_title_url(self, title: str) -> str:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return ""
|
|
direct = self._lookup_title_url(title)
|
|
if direct and _is_series_hint_url(direct):
|
|
return ""
|
|
if direct:
|
|
self._title_to_url[title] = direct
|
|
return direct
|
|
if self._has_series_entries(title) or self._ensure_series_entries_for_title(title):
|
|
self._title_to_url[title] = _series_hint_value(title)
|
|
return ""
|
|
wanted = title.casefold()
|
|
hits = self._search_hits(title)
|
|
for hit in hits:
|
|
if self._episode_entry_from_hit(hit):
|
|
continue
|
|
if hit.title.casefold() == wanted and hit.url:
|
|
self._title_to_url[title] = hit.url
|
|
return hit.url
|
|
return ""
|
|
|
|
def _store_title_meta(self, title: str, *, plot: str = "", poster: str = "") -> None:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return
|
|
old_plot, old_poster = self._title_meta.get(title, ("", ""))
|
|
merged_plot = (plot or old_plot or "").strip()
|
|
merged_poster = (poster or old_poster or "").strip()
|
|
self._title_meta[title] = (merged_plot, merged_poster)
|
|
|
|
def _extract_detail_metadata(self, soup: BeautifulSoupT) -> tuple[str, str]:
|
|
if not soup:
|
|
return "", ""
|
|
root = soup.select_one("div#content[role='main']") or soup
|
|
detail = root.select_one("article.detail") or root
|
|
plot = ""
|
|
poster = ""
|
|
|
|
# Filmpalast Detailseite: bevorzugt den dedizierten Filmhandlung-Block.
|
|
plot_node = detail.select_one(
|
|
"li[itemtype='http://schema.org/Movie'] span[itemprop='description']"
|
|
)
|
|
if plot_node is not None:
|
|
plot = (plot_node.get_text(" ", strip=True) or "").strip()
|
|
if not plot:
|
|
hidden_plot = detail.select_one("cite span.hidden")
|
|
if hidden_plot is not None:
|
|
plot = (hidden_plot.get_text(" ", strip=True) or "").strip()
|
|
if not plot:
|
|
for selector in ("meta[property='og:description']", "meta[name='description']"):
|
|
node = root.select_one(selector)
|
|
if node is None:
|
|
continue
|
|
content = (node.get("content") or "").strip()
|
|
if content:
|
|
plot = content
|
|
break
|
|
|
|
# Filmpalast Detailseite: Cover liegt stabil in `img.cover2`.
|
|
cover = detail.select_one("img.cover2")
|
|
if cover is not None:
|
|
value = (cover.get("data-src") or cover.get("src") or "").strip()
|
|
if value:
|
|
candidate = _absolute_url(value)
|
|
lower = candidate.casefold()
|
|
if "/themes/" not in lower and "spacer.gif" not in lower and "/files/movies/" in lower:
|
|
poster = candidate
|
|
if not poster:
|
|
thumb_node = detail.select_one("li[itemtype='http://schema.org/Movie'] img[itemprop='image']")
|
|
if thumb_node is not None:
|
|
value = (thumb_node.get("data-src") or thumb_node.get("src") or "").strip()
|
|
if value:
|
|
candidate = _absolute_url(value)
|
|
lower = candidate.casefold()
|
|
if "/themes/" not in lower and "spacer.gif" not in lower and "/files/movies/" in lower:
|
|
poster = candidate
|
|
|
|
return plot, poster
|
|
|
|
def remember_series_url(self, title: str, series_url: str) -> None:
|
|
title = (title or "").strip()
|
|
series_url = (series_url or "").strip()
|
|
if not title or not series_url:
|
|
return
|
|
self._title_to_url[title] = series_url
|
|
self._hoster_cache.clear()
|
|
|
|
def series_url_for_title(self, title: str) -> str:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return ""
|
|
direct = self._lookup_title_url(title)
|
|
if direct:
|
|
return direct
|
|
series_key = self._series_key_for_title(title)
|
|
if series_key:
|
|
return _series_hint_value(series_key)
|
|
return ""
|
|
|
|
def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return {}, {}, None
|
|
|
|
info: dict[str, str] = {"title": title}
|
|
art: dict[str, str] = {}
|
|
cached_plot, cached_poster = self._title_meta.get(title, ("", ""))
|
|
if cached_plot:
|
|
info["plot"] = cached_plot
|
|
if cached_poster:
|
|
art = {"thumb": cached_poster, "poster": cached_poster}
|
|
if "plot" in info and art:
|
|
return info, art, None
|
|
|
|
detail_url = self._ensure_title_url(title)
|
|
if not detail_url:
|
|
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
|
|
if series_key:
|
|
seasons = self._series_entries.get(series_key, {})
|
|
first_entry: Optional[EpisodeEntry] = None
|
|
for season_number in sorted(seasons.keys()):
|
|
episodes = seasons.get(season_number, {})
|
|
for episode_number in sorted(episodes.keys()):
|
|
first_entry = episodes.get(episode_number)
|
|
if first_entry is not None:
|
|
break
|
|
if first_entry is not None:
|
|
break
|
|
detail_url = first_entry.url if first_entry is not None else ""
|
|
if not detail_url:
|
|
return info, art, None
|
|
|
|
try:
|
|
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
plot, poster = self._extract_detail_metadata(soup)
|
|
except Exception:
|
|
plot, poster = "", ""
|
|
|
|
if plot:
|
|
info["plot"] = plot
|
|
if poster:
|
|
art = {"thumb": poster, "poster": poster}
|
|
self._store_title_meta(title, plot=info.get("plot", ""), poster=poster)
|
|
return info, art, None
|
|
|
|
def is_movie(self, title: str) -> bool:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return False
|
|
direct = self._lookup_title_url(title)
|
|
if direct:
|
|
return not _is_series_hint_url(direct)
|
|
if SEASON_EPISODE_RE.search(title):
|
|
return False
|
|
if self._has_series_entries(title):
|
|
return False
|
|
if self._ensure_series_entries_for_title(title):
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def _normalize_hoster_name(name: str) -> str:
|
|
name = (name or "").strip()
|
|
if not name:
|
|
return ""
|
|
name = re.sub(r"\s+", " ", name)
|
|
return name
|
|
|
|
def _extract_hoster_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
|
|
hosters: Dict[str, str] = {}
|
|
if not soup:
|
|
return hosters
|
|
|
|
# Primäres Layout: jeder Hoster in eigener UL mit hostName + Play-Link.
|
|
for block in soup.select("ul.currentStreamLinks"):
|
|
host_name_node = block.select_one("li.hostBg .hostName")
|
|
host_name = self._normalize_hoster_name(host_name_node.get_text(" ", strip=True) if host_name_node else "")
|
|
play_anchor = block.select_one("li.streamPlayBtn a[href], a.button.iconPlay[href]")
|
|
href = (play_anchor.get("href") if play_anchor else "") or ""
|
|
play_url = _absolute_url(href).strip()
|
|
if not play_url:
|
|
continue
|
|
if not host_name:
|
|
host_name = self._normalize_hoster_name(play_anchor.get_text(" ", strip=True) if play_anchor else "")
|
|
if not host_name:
|
|
host_name = "Unbekannt"
|
|
if host_name not in hosters:
|
|
hosters[host_name] = play_url
|
|
|
|
# Fallback: direkte Play-Buttons im Stream-Bereich.
|
|
if not hosters:
|
|
for anchor in soup.select("#grap-stream-list a.button.iconPlay[href], .streamLinksWrapper a.button.iconPlay[href]"):
|
|
href = (anchor.get("href") or "").strip()
|
|
play_url = _absolute_url(href).strip()
|
|
if not play_url:
|
|
continue
|
|
text_name = self._normalize_hoster_name(anchor.get_text(" ", strip=True))
|
|
host_name = text_name if text_name and text_name.casefold() not in {"play", "details play"} else "Unbekannt"
|
|
if host_name in hosters:
|
|
host_name = f"{host_name} #{len(hosters) + 1}"
|
|
hosters[host_name] = play_url
|
|
|
|
return hosters
|
|
|
|
def _hosters_for_detail_url(self, detail_url: str) -> Dict[str, str]:
|
|
detail_url = (detail_url or "").strip()
|
|
if not detail_url:
|
|
return {}
|
|
cached = self._hoster_cache.get(detail_url)
|
|
if cached is not None:
|
|
return dict(cached)
|
|
if not self._requests_available:
|
|
return {}
|
|
try:
|
|
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return {}
|
|
hosters = self._extract_hoster_links(soup)
|
|
for url in hosters.values():
|
|
_log_url_event(url, kind="PARSE")
|
|
self._hoster_cache[detail_url] = dict(hosters)
|
|
return dict(hosters)
|
|
|
|
def seasons_for(self, title: str) -> List[str]:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return []
|
|
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
|
|
if series_key:
|
|
seasons = sorted(self._series_entries.get(series_key, {}).keys())
|
|
return [f"Staffel {number}" for number in seasons]
|
|
detail_url = self._ensure_title_url(title)
|
|
return ["Film"] if detail_url else []
|
|
|
|
def episodes_for(self, title: str, season: str) -> List[str]:
|
|
title = (title or "").strip()
|
|
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
|
|
if series_key:
|
|
season_number = _extract_number(season)
|
|
if season_number is None:
|
|
return []
|
|
episodes = self._series_entries.get(series_key, {}).get(season_number, {})
|
|
labels: List[str] = []
|
|
for episode_number in sorted(episodes.keys()):
|
|
entry = episodes[episode_number]
|
|
label = f"Episode {episode_number}"
|
|
if entry.suffix:
|
|
label = f"{label} - {entry.suffix}"
|
|
labels.append(label)
|
|
return labels
|
|
return ["Stream"] if self._ensure_title_url(title) else []
|
|
|
|
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
|
|
detail_url = self._detail_url_for_selection(title, season, episode)
|
|
return self.available_hosters_for_url(detail_url)
|
|
|
|
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
|
|
detail_url = self._detail_url_for_selection(title, season, episode)
|
|
return self.stream_link_for_url(detail_url)
|
|
|
|
def episode_url_for(self, title: str, season: str, episode: str) -> str:
|
|
detail_url = self._detail_url_for_selection(title, season, episode)
|
|
return (detail_url or "").strip()
|
|
|
|
def available_hosters_for_url(self, episode_url: str) -> List[str]:
|
|
detail_url = (episode_url or "").strip()
|
|
hosters = self._hosters_for_detail_url(detail_url)
|
|
return list(hosters.keys())
|
|
|
|
def stream_link_for_url(self, episode_url: str) -> Optional[str]:
|
|
detail_url = (episode_url or "").strip()
|
|
if not detail_url:
|
|
return None
|
|
hosters = self._hosters_for_detail_url(detail_url)
|
|
if hosters:
|
|
for preferred in self._preferred_hosters:
|
|
preferred_key = (preferred or "").strip().casefold()
|
|
if not preferred_key:
|
|
continue
|
|
for host_name, host_url in hosters.items():
|
|
if preferred_key in host_name.casefold() or preferred_key in host_url.casefold():
|
|
_log_url_event(host_url, kind="FOUND")
|
|
return host_url
|
|
first = next(iter(hosters.values()))
|
|
_log_url_event(first, kind="FOUND")
|
|
return first
|
|
if not self._requests_available:
|
|
return detail_url
|
|
|
|
try:
|
|
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
|
|
except Exception:
|
|
return detail_url
|
|
|
|
candidates: List[str] = []
|
|
for iframe in soup.select("iframe[src]"):
|
|
src = (iframe.get("src") or "").strip()
|
|
if src:
|
|
candidates.append(_absolute_url(src))
|
|
for anchor in soup.select("a[href]"):
|
|
href = (anchor.get("href") or "").strip()
|
|
if not href:
|
|
continue
|
|
lower = href.casefold()
|
|
if "watch" in lower or "stream" in lower or "player" in lower:
|
|
candidates.append(_absolute_url(href))
|
|
|
|
deduped: List[str] = []
|
|
seen: set[str] = set()
|
|
for candidate in candidates:
|
|
key = candidate.casefold()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(candidate)
|
|
|
|
if deduped:
|
|
_log_url_event(deduped[0], kind="FOUND")
|
|
return deduped[0]
|
|
return detail_url
|
|
|
|
def set_preferred_hosters(self, hosters: List[str]) -> None:
|
|
normalized = [str(hoster).strip().lower() for hoster in hosters if str(hoster).strip()]
|
|
if normalized:
|
|
self._preferred_hosters = normalized
|
|
|
|
def reset_preferred_hosters(self) -> None:
|
|
self._preferred_hosters = list(self._default_preferred_hosters)
|
|
|
|
def resolve_stream_link(self, link: str) -> Optional[str]:
|
|
if not link:
|
|
return None
|
|
try:
|
|
from resolveurl_backend import resolve as resolve_with_resolveurl
|
|
except Exception:
|
|
resolve_with_resolveurl = None
|
|
|
|
# 1) Immer zuerst den ursprünglichen Hoster-Link an ResolveURL geben.
|
|
if callable(resolve_with_resolveurl):
|
|
resolved_by_resolveurl = resolve_with_resolveurl(link)
|
|
if resolved_by_resolveurl:
|
|
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
|
|
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
|
|
return resolved_by_resolveurl
|
|
|
|
redirected = link
|
|
if self._requests_available:
|
|
response = None
|
|
try:
|
|
session = get_requests_session("filmpalast", headers=HEADERS)
|
|
response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
|
|
response.raise_for_status()
|
|
redirected = (response.url or link).strip() or link
|
|
except Exception:
|
|
redirected = link
|
|
finally:
|
|
if response is not None:
|
|
try:
|
|
response.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# 2) Danach optional die Redirect-URL nochmals auflösen.
|
|
if callable(resolve_with_resolveurl) and redirected and redirected != link:
|
|
resolved_by_resolveurl = resolve_with_resolveurl(redirected)
|
|
if resolved_by_resolveurl:
|
|
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
|
|
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
|
|
return resolved_by_resolveurl
|
|
|
|
# 3) Fallback bleibt wie bisher: direkte URL zurückgeben.
|
|
if redirected:
|
|
_log_url_event(redirected, kind="FINAL")
|
|
return redirected
|
|
return None
|
|
|
|
|
|
# Alias für die automatische Plugin-Erkennung.
|
|
Plugin = FilmpalastPlugin
|