"""HTML-basierte Integration fuer eine Streaming-/Mediathek-Seite (Template).
Dieses Plugin ist als Startpunkt gedacht, um eine eigene/autorisiert betriebene
Seite mit einer HTML-Suche in ViewIt einzubinden.
Hinweise:
- Nutzt optional `requests` + `beautifulsoup4` (bs4).
- `search_titles` liefert eine Trefferliste (Titel-Strings).
- `seasons_for` / `episodes_for` können für Filme als Single-Season/Single-Episode
modelliert werden (z.B. Staffel 1, Episode 1) oder komplett leer bleiben,
solange nur Serien unterstützt werden.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
import hashlib
import os
import re
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
from urllib.parse import urlencode, urljoin
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
try: # pragma: no cover - optional Kodi helpers
import xbmcaddon # type: ignore[import-not-found]
import xbmcvfs # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow running outside Kodi
xbmcaddon = None
xbmcvfs = None
xbmcgui = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, log_url, notify_url
from regex_patterns import DIGITS
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "topstream_base_url"
DEFAULT_BASE_URL = "https://www.meineseite"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
SETTING_GENRE_MAX_PAGES = "topstream_genre_max_pages"
DEFAULT_TIMEOUT = 20
DEFAULT_PREFERRED_HOSTERS = ["supervideo", "dropload", "voe"]
MEINECLOUD_HOST = "meinecloud.click"
DEFAULT_GENRE_MAX_PAGES = 20
HARD_MAX_GENRE_PAGES = 200
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass(frozen=True)
class SearchHit:
"""Interner Treffer mit Title + URL."""
title: str
url: str
description: str = ""
def _normalize_search_text(value: str) -> str:
"""Normalisiert Text für robuste, wortbasierte Suche/Filter.
Wir ersetzen Nicht-Alphanumerisches durch Leerzeichen und kollabieren Whitespace.
Dadurch kann z.B. "Star Trek: Lower Decks – Der Film" sauber auf Tokens gematcht werden.
"""
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str, description: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = _normalize_search_text(title)
if not haystack:
return False
return normalized_query in haystack
def _strip_der_film_suffix(title: str) -> str:
"""Entfernt den Suffix 'Der Film' am Ende, z.B. 'Star Trek – Der Film'."""
title = (title or "").strip()
if not title:
return ""
title = re.sub(r"\s*[-–]\s*der\s+film\s*$", "", title, flags=re.IGNORECASE).strip()
return title
class TopstreamfilmPlugin(BasisPlugin):
"""Integration fuer eine HTML-basierte Suchseite."""
name = "TopStreamFilm"
def __init__(self) -> None:
self._session: RequestsSession | None = None
self._title_to_url: Dict[str, str] = {}
self._genre_to_url: Dict[str, str] = {}
self._movie_iframe_url: Dict[str, str] = {}
self._movie_title_hint: set[str] = set()
self._genre_last_page: Dict[str, int] = {}
self._season_cache: Dict[str, List[str]] = {}
self._episode_cache: Dict[tuple[str, str], List[str]] = {}
self._episode_to_url: Dict[tuple[str, str, str], str] = {}
self._episode_to_hosters: Dict[tuple[str, str, str], Dict[str, str]] = {}
self._season_to_episode_numbers: Dict[tuple[str, str], List[int]] = {}
self._episode_title_by_number: Dict[tuple[str, int, int], str] = {}
self._detail_html_cache: Dict[str, str] = {}
self._popular_cache: List[str] | None = None
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
self.is_available = REQUESTS_AVAILABLE
self.unavailable_reason = None if REQUESTS_AVAILABLE else f"requests/bs4 fehlen: {REQUESTS_IMPORT_ERROR}"
self._load_title_url_cache()
self._load_genre_cache()
def _cache_dir(self) -> str:
if xbmcaddon and xbmcvfs:
try:
addon = xbmcaddon.Addon(ADDON_ID)
profile = xbmcvfs.translatePath(addon.getAddonInfo("profile"))
if not xbmcvfs.exists(profile):
xbmcvfs.mkdirs(profile)
return profile
except Exception:
pass
return os.path.dirname(__file__)
def _title_url_cache_path(self) -> str:
return os.path.join(self._cache_dir(), "topstream_title_url_cache.json")
def _load_title_url_cache(self) -> None:
path = self._title_url_cache_path()
try:
if xbmcvfs and xbmcvfs.exists(path):
handle = xbmcvfs.File(path)
raw = handle.read()
handle.close()
elif os.path.exists(path):
with open(path, "r", encoding="utf-8") as handle:
raw = handle.read()
else:
return
loaded = json.loads(raw or "{}")
if isinstance(loaded, dict):
# New format: {base_url: {title: url}}
base_url = self._get_base_url()
if base_url in loaded and isinstance(loaded.get(base_url), dict):
loaded = loaded.get(base_url) or {}
# Backwards compatible: {title: url}
for title, url in (loaded or {}).items():
if isinstance(title, str) and isinstance(url, str) and title.strip() and url.strip():
self._title_to_url.setdefault(title.strip(), url.strip())
except Exception:
return
def _save_title_url_cache(self) -> None:
path = self._title_url_cache_path()
try:
base_url = self._get_base_url()
store: Dict[str, Dict[str, str]] = {}
# merge with existing
try:
if xbmcvfs and xbmcvfs.exists(path):
handle = xbmcvfs.File(path)
existing_raw = handle.read()
handle.close()
elif os.path.exists(path):
with open(path, "r", encoding="utf-8") as handle:
existing_raw = handle.read()
else:
existing_raw = ""
existing = json.loads(existing_raw or "{}")
if isinstance(existing, dict):
if all(isinstance(k, str) and isinstance(v, dict) for k, v in existing.items()):
store = {k: dict(v) for k, v in existing.items()} # type: ignore[arg-type]
except Exception:
store = {}
store[base_url] = dict(self._title_to_url)
payload = json.dumps(store, ensure_ascii=False, sort_keys=True)
except Exception:
return
try:
if xbmcaddon and xbmcvfs:
directory = os.path.dirname(path)
if directory and not xbmcvfs.exists(directory):
xbmcvfs.mkdirs(directory)
handle = xbmcvfs.File(path, "w")
handle.write(payload)
handle.close()
else:
with open(path, "w", encoding="utf-8") as handle:
handle.write(payload)
except Exception:
return
def _genre_cache_path(self) -> str:
return os.path.join(self._cache_dir(), "topstream_genres_cache.json")
def _load_genre_cache(self) -> None:
path = self._genre_cache_path()
try:
if xbmcvfs and xbmcvfs.exists(path):
handle = xbmcvfs.File(path)
raw = handle.read()
handle.close()
elif os.path.exists(path):
with open(path, "r", encoding="utf-8") as handle:
raw = handle.read()
else:
return
loaded = json.loads(raw or "{}")
if isinstance(loaded, dict):
base_url = self._get_base_url()
mapping = loaded.get(base_url)
if isinstance(mapping, dict):
for genre, url in mapping.items():
if isinstance(genre, str) and isinstance(url, str) and genre.strip() and url.strip():
self._genre_to_url.setdefault(genre.strip(), url.strip())
except Exception:
return
def _save_genre_cache(self) -> None:
path = self._genre_cache_path()
try:
base_url = self._get_base_url()
store: Dict[str, Dict[str, str]] = {}
try:
if xbmcvfs and xbmcvfs.exists(path):
handle = xbmcvfs.File(path)
existing_raw = handle.read()
handle.close()
elif os.path.exists(path):
with open(path, "r", encoding="utf-8") as handle:
existing_raw = handle.read()
else:
existing_raw = ""
existing = json.loads(existing_raw or "{}")
if isinstance(existing, dict):
if all(isinstance(k, str) and isinstance(v, dict) for k, v in existing.items()):
store = {k: dict(v) for k, v in existing.items()} # type: ignore[arg-type]
except Exception:
store = {}
store[base_url] = dict(self._genre_to_url)
payload = json.dumps(store, ensure_ascii=False, sort_keys=True)
except Exception:
return
try:
if xbmcaddon and xbmcvfs:
directory = os.path.dirname(path)
if directory and not xbmcvfs.exists(directory):
xbmcvfs.mkdirs(directory)
handle = xbmcvfs.File(path, "w")
handle.write(payload)
handle.close()
else:
with open(path, "w", encoding="utf-8") as handle:
handle.write(payload)
except Exception:
return
def _get_session(self) -> RequestsSession:
if requests is None:
raise RuntimeError(self.unavailable_reason or "requests nicht verfügbar.")
if self._session is None:
session = requests.Session()
session.headers.update(HEADERS)
self._session = session
return self._session
def _get_base_url(self) -> str:
base = DEFAULT_BASE_URL
if xbmcaddon is not None:
try:
addon = xbmcaddon.Addon(ADDON_ID)
raw = (addon.getSetting(SETTING_BASE_URL) or "").strip()
if raw:
base = raw
except Exception:
pass
base = (base or "").strip()
if not base:
return DEFAULT_BASE_URL
if not base.startswith("http://") and not base.startswith("https://"):
base = "https://" + base
return base.rstrip("/")
def _absolute_url(self, href: str) -> str:
return urljoin(self._get_base_url() + "/", href or "")
@staticmethod
def _absolute_external_url(href: str, *, base: str = "") -> str:
href = (href or "").strip()
if not href:
return ""
if href.startswith("//"):
return "https:" + href
if href.startswith("http://") or href.startswith("https://"):
return href
if base:
return urljoin(base if base.endswith("/") else base + "/", href)
return href
def _get_setting_bool(self, setting_id: str, *, default: bool = False) -> bool:
return get_setting_bool(ADDON_ID, setting_id, default=default)
def _get_setting_int(self, setting_id: str, *, default: int) -> int:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(ADDON_ID)
getter = getattr(addon, "getSettingInt", None)
if callable(getter):
return int(getter(setting_id))
raw = str(addon.getSetting(setting_id) or "").strip()
return int(raw) if raw else default
except Exception:
return default
def _notify_url(self, url: str) -> None:
notify_url(ADDON_ID, heading=self.name, url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO)
def _log_url(self, url: str, *, kind: str = "VISIT") -> None:
log_url(ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, log_filename="topstream_urls.log", url=url, kind=kind)
def _log_response_html(self, url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="topstream_response",
)
def capabilities(self) -> set[str]:
return {"genres", "popular_series"}
def _popular_url(self) -> str:
return self._absolute_url("/beliebte-filme-online.html")
def popular_series(self) -> List[str]:
"""Liefert die "Meist gesehen"/"Beliebte Filme" Liste.
Quelle: `/beliebte-filme-online.html` (TopStreamFilm Template).
"""
if self._popular_cache is not None:
return list(self._popular_cache)
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
self._popular_cache = []
return []
try:
soup = self._get_soup(self._popular_url())
except Exception:
self._popular_cache = []
return []
hits = self._parse_listing_titles(soup)
titles: List[str] = []
seen: set[str] = set()
for hit in hits:
if not hit.title or hit.title in seen:
continue
seen.add(hit.title)
self._title_to_url[hit.title] = hit.url
titles.append(hit.title)
if titles:
self._save_title_url_cache()
self._popular_cache = list(titles)
return list(titles)
def _parse_genres_from_home(self, soup: BeautifulSoupT) -> Dict[str, str]:
genres: Dict[str, str] = {}
if soup is None:
return genres
# Primär: im Header-Menü unter "KATEGORIEN"
categories_anchor = None
for anchor in soup.select("li.menu-item-has-children a"):
text = (anchor.get_text(" ", strip=True) or "").strip().casefold()
if text == "kategorien":
categories_anchor = anchor
break
if categories_anchor is not None:
try:
parent = categories_anchor.find_parent("li")
except Exception:
parent = None
if parent is not None:
for anchor in parent.select("ul.sub-menu li.cat-item a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
genres[name] = self._absolute_url(href)
# Fallback: allgemeine cat-item Links (falls Theme anders ist)
if not genres:
for anchor in soup.select("li.cat-item a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
genres[name] = self._absolute_url(href)
return genres
def _extract_first_int(self, value: str) -> Optional[int]:
match = re.search(DIGITS, value or "")
return int(match.group(1)) if match else None
def _strip_links_text(self, node: Any) -> str:
"""Extrahiert den Text eines Nodes ohne Linktexte/URLs."""
if BeautifulSoup is None:
return ""
try:
fragment = BeautifulSoup(str(node), "html.parser")
for anchor in fragment.select("a"):
anchor.extract()
return (fragment.get_text(" ", strip=True) or "").strip()
except Exception:
return ""
def _clear_stream_index_for_title(self, title: str) -> None:
for key in list(self._season_to_episode_numbers.keys()):
if key[0] == title:
self._season_to_episode_numbers.pop(key, None)
for key in list(self._episode_to_hosters.keys()):
if key[0] == title:
self._episode_to_hosters.pop(key, None)
for key in list(self._episode_title_by_number.keys()):
if key[0] == title:
self._episode_title_by_number.pop(key, None)
def _parse_stream_accordion(self, soup: BeautifulSoupT, *, title: str) -> None:
"""Parst Staffel/Episode/Hoster-Links aus der Detailseite (Accordion)."""
if not soup or not title:
return
accordion = soup.select_one("#se-accordion") or soup.select_one(".su-accordion#se-accordion")
if accordion is None:
return
self._clear_stream_index_for_title(title)
for spoiler in accordion.select(".su-spoiler"):
season_title = spoiler.select_one(".su-spoiler-title")
if not season_title:
continue
season_text = (season_title.get_text(" ", strip=True) or "").strip()
season_number = self._extract_first_int(season_text)
if season_number is None:
continue
season_label = f"Staffel {season_number}"
data_target = (season_title.get("data-target") or "").strip()
content = spoiler.select_one(data_target) if data_target.startswith("#") else None
if content is None:
content = spoiler.select_one(".su-spoiler-content")
if content is None:
continue
episode_numbers: set[int] = set()
for row in content.select(".cu-ss"):
raw_text = self._strip_links_text(row)
raw_text = (raw_text or "").strip()
if not raw_text:
continue
match = re.search(
r"(?P\d+)\s*x\s*(?P\d+)\s*(?P.*)$",
raw_text,
flags=re.IGNORECASE,
)
if not match:
continue
row_season = int(match.group("s"))
episode_number = int(match.group("e"))
if row_season != season_number:
continue
rest = (match.group("rest") or "").strip().replace("–", "-")
# Links stehen als im HTML, d.h. hier bleibt normalerweise nur "Episode X –" übrig.
if "-" in rest:
rest = rest.split("-", 1)[0].strip()
rest = re.sub(r"\bepisode\s*\d+\b", "", rest, flags=re.IGNORECASE).strip()
rest = re.sub(r"^\W+|\W+$", "", rest).strip()
if rest:
self._episode_title_by_number[(title, season_number, episode_number)] = rest
hosters: Dict[str, str] = {}
for anchor in row.select("a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
hosters[name] = href
if not hosters:
continue
episode_label = f"Episode {episode_number}"
ep_title = self._episode_title_by_number.get((title, season_number, episode_number), "")
if ep_title:
episode_label = f"Episode {episode_number}: {ep_title}"
self._episode_to_hosters[(title, season_label, episode_label)] = hosters
episode_numbers.add(episode_number)
self._season_to_episode_numbers[(title, season_label)] = sorted(episode_numbers)
def _ensure_stream_index(self, title: str) -> None:
"""Stellt sicher, dass Staffel/Episoden/Hoster aus der Detailseite geparst sind."""
title = (title or "").strip()
if not title:
return
# Wenn bereits Staffeln im Index sind, nichts tun.
if any(key[0] == title for key in self._season_to_episode_numbers.keys()):
return
soup = self._get_detail_soup(title)
if soup is None:
return
self._parse_stream_accordion(soup, title=title)
def _get_soup(self, url: str) -> BeautifulSoupT:
if BeautifulSoup is None or not REQUESTS_AVAILABLE:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
session = self._get_session()
self._log_url(url, kind="VISIT")
self._notify_url(url)
response = session.get(url, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
self._log_url(response.url, kind="OK")
self._log_response_html(response.url, response.text)
return BeautifulSoup(response.text, "html.parser")
def _get_detail_soup(self, title: str) -> Optional[BeautifulSoupT]:
title = (title or "").strip()
if not title:
return None
url = self._title_to_url.get(title)
if not url:
return None
if BeautifulSoup is None or not REQUESTS_AVAILABLE:
return None
cached_html = self._detail_html_cache.get(title)
if cached_html:
return BeautifulSoup(cached_html, "html.parser")
soup = self._get_soup(url)
try:
self._detail_html_cache[title] = str(soup)
except Exception:
pass
return soup
def _detect_movie_iframe_url(self, soup: BeautifulSoupT) -> str:
"""Erkennt Film-Detailseiten über eingebettetes MeineCloud-iframe."""
if not soup:
return ""
for frame in soup.select("iframe[src]"):
src = (frame.get("src") or "").strip()
if not src:
continue
if MEINECLOUD_HOST in src:
return src
return ""
def _parse_meinecloud_hosters(self, soup: BeautifulSoupT, *, page_url: str) -> Dict[str, str]:
"""Parst Hoster-Mirrors aus MeineCloud (Film-Seite).
Beispiel:
- supervideo
- dropload
- 4K Server
"""
hosters: Dict[str, str] = {}
if not soup:
return hosters
for entry in soup.select("ul._player-mirrors li[data-link]"):
raw_link = (entry.get("data-link") or "").strip()
if not raw_link:
continue
name = (entry.get_text(" ", strip=True) or "").strip()
name = name or "Hoster"
url = self._absolute_external_url(raw_link, base=page_url)
if not url:
continue
hosters[name] = url
# Falls "4K Server" wieder auf eine MeineCloud-Seite zeigt, versuchen wir einmal zu expandieren.
expanded: Dict[str, str] = {}
for name, url in list(hosters.items()):
if MEINECLOUD_HOST in url and "/fullhd/" in url:
try:
nested = self._get_soup(url)
except Exception:
continue
nested_hosters = self._parse_meinecloud_hosters(nested, page_url=url)
for nested_name, nested_url in nested_hosters.items():
expanded.setdefault(nested_name, nested_url)
if expanded:
hosters.update(expanded)
return hosters
def _extract_last_page(self, soup: BeautifulSoupT) -> int:
"""Liest aus `div.wp-pagenavi` die höchste Seitenzahl."""
if not soup:
return 1
numbers: List[int] = []
for anchor in soup.select("div.wp-pagenavi a"):
text = (anchor.get_text(" ", strip=True) or "").strip()
if text.isdigit():
try:
numbers.append(int(text))
except Exception:
continue
return max(numbers) if numbers else 1
def _parse_listing_titles(self, soup: BeautifulSoupT) -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
for item in soup.select("li.TPostMv"):
anchor = item.select_one("a[href]")
if not anchor:
continue
href = (anchor.get("href") or "").strip()
if not href:
continue
title_tag = anchor.select_one("h3.Title")
raw_title = title_tag.get_text(" ", strip=True) if title_tag else anchor.get_text(" ", strip=True)
raw_title = (raw_title or "").strip()
is_movie_hint = bool(re.search(r"\bder\s+film\b", raw_title, flags=re.IGNORECASE))
title = _strip_der_film_suffix(raw_title)
if not title:
continue
if is_movie_hint:
self._movie_title_hint.add(title)
hits.append(SearchHit(title=title, url=self._absolute_url(href), description=""))
return hits
def is_movie(self, title: str) -> bool:
"""Schneller Hint (ohne Detail-Request), ob ein Titel ein Film ist."""
title = (title or "").strip()
if not title:
return False
if title in self._movie_iframe_url or title in self._movie_title_hint:
return True
# Robust: Detailseite prüfen.
# Laut TopStream-Layout sind Serien-Seiten durch `div.serie-menu` (Staffel-Navigation)
# gekennzeichnet. Fehlt das Element, behandeln wir den Titel als Film.
soup = self._get_detail_soup(title)
if soup is None:
return False
has_seasons = bool(soup.select_one("div.serie-menu") or soup.select_one(".serie-menu"))
return not has_seasons
def genre_page_count(self, genre: str) -> int:
"""Optional: Liefert die letzte Seite eines Genres (Pagination)."""
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
return 1
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._genre_last_page:
return max(1, int(self._genre_last_page[genre] or 1))
if not self._genre_to_url:
self.genres()
url = self._genre_to_url.get(genre)
if not url:
return 1
try:
soup = self._get_soup(url)
except Exception:
return 1
last_page = self._extract_last_page(soup)
self._genre_last_page[genre] = max(1, int(last_page or 1))
return self._genre_last_page[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
"""Optional: Liefert Titel für ein Genre und eine konkrete Seite."""
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
genre = (genre or "").strip()
if not genre:
return []
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre)
if not base_url:
return []
page = max(1, int(page or 1))
if page == 1:
url = base_url
else:
url = urljoin(base_url.rstrip("/") + "/", f"page/{page}/")
try:
soup = self._get_soup(url)
except Exception:
return []
hits = self._parse_listing_titles(soup)
titles: List[str] = []
seen: set[str] = set()
for hit in hits:
if hit.title in seen:
continue
seen.add(hit.title)
self._title_to_url[hit.title] = hit.url
titles.append(hit.title)
if titles:
self._save_title_url_cache()
return titles
def _ensure_title_index(self, title: str) -> None:
"""Stellt sicher, dass Film/Serie-Infos für den Titel geparst sind."""
title = (title or "").strip()
if not title:
return
# Bereits bekannt?
if title in self._movie_iframe_url:
return
if any(key[0] == title for key in self._season_to_episode_numbers.keys()):
return
soup = self._get_detail_soup(title)
if soup is None:
return
movie_url = self._detect_movie_iframe_url(soup)
if movie_url:
self._movie_iframe_url[title] = movie_url
# Film als Single-Season/Single-Episode abbilden, damit ViewIt navigieren kann.
season_label = "Film"
episode_label = "Stream"
self._season_cache[title] = [season_label]
self._episode_cache[(title, season_label)] = [episode_label]
try:
meinecloud_soup = self._get_soup(movie_url)
hosters = self._parse_meinecloud_hosters(meinecloud_soup, page_url=movie_url)
except Exception:
hosters = {}
self._episode_to_hosters[(title, season_label, episode_label)] = hosters or {"MeineCloud": movie_url}
return
# Sonst: Serie via Streams-Accordion parsen (falls vorhanden).
self._parse_stream_accordion(soup, title=title)
async def search_titles(self, query: str) -> List[str]:
"""Sucht Titel ueber eine HTML-Suche.
Erwartetes HTML (Snippet):
- Treffer: `li.TPostMv a[href]`
- Titel: `h3.Title`
"""
if not REQUESTS_AVAILABLE:
return []
query = (query or "").strip()
if not query:
return []
session = self._get_session()
url = self._get_base_url() + "/"
params = {"story": query, "do": "search", "subaction": "search"}
request_url = f"{url}?{urlencode(params)}"
self._log_url(request_url, kind="GET")
self._notify_url(request_url)
response = session.get(
url,
params=params,
timeout=DEFAULT_TIMEOUT,
)
response.raise_for_status()
self._log_url(response.url, kind="OK")
self._log_response_html(response.url, response.text)
if BeautifulSoup is None:
return []
soup = BeautifulSoup(response.text, "html.parser")
hits: List[SearchHit] = []
for item in soup.select("li.TPostMv"):
anchor = item.select_one("a[href]")
if not anchor:
continue
href = (anchor.get("href") or "").strip()
if not href:
continue
title_tag = anchor.select_one("h3.Title")
raw_title = title_tag.get_text(" ", strip=True) if title_tag else anchor.get_text(" ", strip=True)
raw_title = (raw_title or "").strip()
is_movie_hint = bool(re.search(r"\bder\s+film\b", raw_title, flags=re.IGNORECASE))
title = _strip_der_film_suffix(raw_title)
if not title:
continue
if is_movie_hint:
self._movie_title_hint.add(title)
description_tag = item.select_one(".TPMvCn .Description")
description = description_tag.get_text(" ", strip=True) if description_tag else ""
hit = SearchHit(title=title, url=self._absolute_url(href), description=description)
if _matches_query(query, title=hit.title, description=hit.description):
hits.append(hit)
# Dedup + mapping fuer Navigation
self._title_to_url.clear()
titles: List[str] = []
seen: set[str] = set()
for hit in hits:
if hit.title in seen:
continue
seen.add(hit.title)
self._title_to_url[hit.title] = hit.url
titles.append(hit.title)
self._save_title_url_cache()
return titles
def genres(self) -> List[str]:
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
if self._genre_to_url:
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
try:
soup = self._get_soup(self._get_base_url() + "/")
except Exception:
return []
parsed = self._parse_genres_from_home(soup)
self._genre_to_url.clear()
self._genre_to_url.update(parsed)
self._save_genre_cache()
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
def titles_for_genre(self, genre: str) -> List[str]:
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
genre = (genre or "").strip()
if not genre:
return []
if not self._genre_to_url:
self.genres()
url = self._genre_to_url.get(genre)
if not url:
return []
# Backwards-compatible: liefert nur Seite 1 (Paging läuft über titles_for_genre_page()).
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title or not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
self._ensure_title_index(title)
if title in self._movie_iframe_url:
return ["Film"]
# Primär: Streams-Accordion (enthält echte Staffel-/Episodenlistings).
self._ensure_stream_index(title)
seasons = sorted(
{season_label for (t, season_label) in self._season_to_episode_numbers.keys() if t == title},
key=lambda value: (self._extract_first_int(value) or 0),
)
if seasons:
self._season_cache[title] = list(seasons)
return list(seasons)
# Fallback: Staffel-Tabs im Seitenmenü (ohne Links).
cached = self._season_cache.get(title)
if cached is not None:
return list(cached)
soup = self._get_detail_soup(title)
if soup is None:
self._season_cache[title] = []
return []
numbers: List[int] = []
seen: set[int] = set()
for anchor in soup.select(
"div.serie-menu div.tt_season ul.nav a[href^='#season-'],"
" .serie-menu .tt_season a[href^='#season-'],"
" a[data-toggle='tab'][href^='#season-']"
):
text = (anchor.get_text(" ", strip=True) or "").strip()
num = self._extract_first_int(text)
if num is None:
href = (anchor.get("href") or "").strip()
num = self._extract_first_int(href.replace("#season-", ""))
if num is None or num in seen:
continue
seen.add(num)
numbers.append(num)
seasons = [f"Staffel {n}" for n in sorted(numbers)]
self._season_cache[title] = list(seasons)
return list(seasons)
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
season = (season or "").strip()
if not title or not season or not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
self._ensure_title_index(title)
if title in self._movie_iframe_url and season == "Film":
return ["Stream"]
cache_key = (title, season)
cached = self._episode_cache.get(cache_key)
if cached is not None:
return list(cached)
self._ensure_stream_index(title)
episode_numbers = self._season_to_episode_numbers.get((title, season), [])
episodes: List[str] = []
season_number = self._extract_first_int(season) or 0
for ep_no in episode_numbers:
label = f"Episode {ep_no}"
ep_title = self._episode_title_by_number.get((title, season_number, ep_no), "")
if ep_title:
label = f"Episode {ep_no}: {ep_title}"
episodes.append(label)
self._episode_cache[cache_key] = list(episodes)
return list(episodes)
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
title = (title or "").strip()
season = (season or "").strip()
episode = (episode or "").strip()
if not title or not season or not episode:
return []
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
self._ensure_title_index(title)
self._ensure_stream_index(title)
hosters = self._episode_to_hosters.get((title, season, episode), {})
return sorted(hosters.keys(), key=lambda value: value.casefold())
def set_preferred_hosters(self, hosters: List[str]) -> None:
normalized = [hoster.strip().lower() for hoster in hosters if hoster and hoster.strip()]
if normalized:
self._preferred_hosters = normalized
def reset_preferred_hosters(self) -> None:
self._preferred_hosters = list(self._default_preferred_hosters)
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
season = (season or "").strip()
episode = (episode or "").strip()
if not title or not season or not episode:
return None
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
return None
self._ensure_title_index(title)
self._ensure_stream_index(title)
hosters = self._episode_to_hosters.get((title, season, episode), {})
if not hosters:
return None
preferred = [h.casefold() for h in (self._preferred_hosters or [])]
if preferred:
for preferred_name in preferred:
for actual_name, url in hosters.items():
if actual_name.casefold() == preferred_name:
return url
# Wenn nichts passt: deterministisch den ersten.
first_name = sorted(hosters.keys(), key=lambda value: value.casefold())[0]
return hosters.get(first_name)
def resolve_stream_link(self, link: str) -> Optional[str]:
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved = resolve_with_resolveurl(link)
return resolved or link
return link
# Alias für die automatische Plugin-Erkennung.
Plugin = TopstreamfilmPlugin