"""Serienstream (s.to) Integration als Downloader-Plugin. Hinweise: - Diese Integration nutzt optional `requests` + `beautifulsoup4` (bs4). - In Kodi koennen zusaetzliche Debug-Funktionen ueber Addon-Settings aktiviert werden (URL-Logging, HTML-Dumps, Benachrichtigungen). """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime import hashlib import os import re from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias try: # pragma: no cover - optional dependency import requests from bs4 import BeautifulSoup # type: ignore[import-not-found] except ImportError as exc: # pragma: no cover - optional dependency requests = None BeautifulSoup = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None try: # pragma: no cover - optional Kodi helpers import xbmcaddon # type: ignore[import-not-found] import xbmcvfs # type: ignore[import-not-found] import xbmcgui # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow running outside Kodi xbmcaddon = None xbmcvfs = None xbmcgui = None from plugin_interface import BasisPlugin from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url from http_session_pool import get_requests_session from regex_patterns import SEASON_EPISODE_TAG, SEASON_EPISODE_URL if TYPE_CHECKING: # pragma: no cover from requests import Session as RequestsSession from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found] else: # pragma: no cover RequestsSession: TypeAlias = Any BeautifulSoupT: TypeAlias = Any SETTING_BASE_URL = "serienstream_base_url" DEFAULT_BASE_URL = "https://s.to" DEFAULT_PREFERRED_HOSTERS = ["voe"] DEFAULT_TIMEOUT = 20 ADDON_ID = "plugin.video.viewit" GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info" GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors" SETTING_LOG_URLS = "log_urls_serienstream" SETTING_DUMP_HTML = "dump_html_serienstream" SETTING_SHOW_URL_INFO = "show_url_info_serienstream" SETTING_LOG_ERRORS = "log_errors_serienstream" HEADERS = { "User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Connection": "keep-alive", } @dataclass class SeriesResult: title: str description: str url: str @dataclass class EpisodeInfo: number: int title: str original_title: str url: str season_label: str = "" languages: List[str] = field(default_factory=list) hosters: List[str] = field(default_factory=list) @dataclass class LatestEpisode: series_title: str season: int episode: int url: str airdate: str @dataclass class SeasonInfo: number: int url: str episodes: List[EpisodeInfo] def _get_base_url() -> str: base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip() if not base: base = DEFAULT_BASE_URL return base.rstrip("/") def _series_base_url() -> str: return f"{_get_base_url()}/serie/stream" def _popular_series_url() -> str: return f"{_get_base_url()}/beliebte-serien" def _latest_episodes_url() -> str: return f"{_get_base_url()}" def _absolute_url(href: str) -> str: return f"{_get_base_url()}{href}" if href.startswith("/") else href def _normalize_series_url(identifier: str) -> str: if identifier.startswith("http://") or identifier.startswith("https://"): return identifier.rstrip("/") slug = identifier.strip("/") return f"{_series_base_url()}/{slug}" def _series_root_url(url: str) -> str: """Normalisiert eine Serien-URL auf die Root-URL (ohne /staffel-x oder /episode-x).""" normalized = (url or "").strip().rstrip("/") normalized = re.sub(r"/staffel-\d+(?:/.*)?$", "", normalized) normalized = re.sub(r"/episode-\d+(?:/.*)?$", "", normalized) return normalized.rstrip("/") def _log_visit(url: str) -> None: _log_url(url, kind="VISIT") _notify_url(url) if xbmcaddon is None: print(f"Visiting: {url}") def _normalize_text(value: str) -> str: """Legacy normalization (kept for backwards compatibility).""" value = value.casefold() value = re.sub(r"[^a-z0-9]+", "", value) return value def _normalize_search_text(value: str) -> str: """Normalisiert Text für die Suche ohne Wortgrenzen zu "verschmelzen". Wichtig: Wir ersetzen Nicht-Alphanumerisches durch Leerzeichen, statt es zu entfernen. Dadurch entstehen keine künstlichen Treffer über Wortgrenzen hinweg (z.B. "an" + "na" -> "anna"). """ value = (value or "").casefold() value = re.sub(r"[^a-z0-9]+", " ", value) value = re.sub(r"\s+", " ", value).strip() return value def _is_episode_tba(title: str, original_title: str) -> bool: combined = f"{title} {original_title}".casefold() markers = ("tba", "demnächst", "demnaechst", "coming soon", "to be announced") return any(marker in combined for marker in markers) def _row_is_upcoming(row: BeautifulSoupT) -> bool: classes = row.get("class") or [] if isinstance(classes, str): classes = classes.split() if "upcoming" in classes: return True badge = row.select_one(".badge-upcoming") if badge and (badge.get_text(" ", strip=True) or "").strip(): return True watch_cell = row.select_one(".episode-watch-cell") if watch_cell: text = watch_cell.get_text(" ", strip=True).casefold() if "tba" in text: return True return False def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool: return get_setting_bool(ADDON_ID, setting_id, default=default) def _notify_url(url: str) -> None: notify_url( ADDON_ID, heading="Serienstream", url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO, plugin_setting_id=SETTING_SHOW_URL_INFO, ) def _log_url(url: str, *, kind: str = "VISIT") -> None: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="serienstream_urls.log", url=url, kind=kind, ) def _log_parsed_url(url: str) -> None: _log_url(url, kind="PARSE") def _log_response_html(url: str, body: str) -> None: dump_response_html( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_DUMP_HTML, plugin_setting_id=SETTING_DUMP_HTML, url=url, body=body, filename_prefix="s_to_response", ) def _log_error(message: str) -> None: log_error( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS, plugin_setting_id=SETTING_LOG_ERRORS, log_filename="serienstream_errors.log", message=message, ) def _ensure_requests() -> None: if requests is None or BeautifulSoup is None: raise RuntimeError("requests/bs4 sind nicht verfuegbar.") def _looks_like_cloudflare_challenge(body: str) -> bool: lower = body.lower() markers = ( "cf-browser-verification", "cf-challenge", "cf_chl", "challenge-platform", "attention required! | cloudflare", "just a moment...", "cloudflare ray id", ) return any(marker in lower for marker in markers) def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT: _ensure_requests() _log_visit(url) sess = session or get_requests_session("serienstream", headers=HEADERS) try: response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() except Exception as exc: _log_error(f"GET {url} failed: {exc}") raise if response.url and response.url != url: _log_url(response.url, kind="REDIRECT") _log_response_html(url, response.text) if _looks_like_cloudflare_challenge(response.text): raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") return BeautifulSoup(response.text, "html.parser") def _get_soup_simple(url: str) -> BeautifulSoupT: _ensure_requests() _log_visit(url) sess = get_requests_session("serienstream", headers=HEADERS) try: response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() except Exception as exc: _log_error(f"GET {url} failed: {exc}") raise if response.url and response.url != url: _log_url(response.url, kind="REDIRECT") _log_response_html(url, response.text) if _looks_like_cloudflare_challenge(response.text): raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") return BeautifulSoup(response.text, "html.parser") def search_series(query: str) -> List[SeriesResult]: """Sucht Serien im (/serien)-Katalog (Genre-liste) nach Titel/Alt-Titel.""" _ensure_requests() normalized_query = _normalize_search_text(query) if not normalized_query: return [] # Direkter Abruf wie in fetch_serien.py. catalog_url = f"{_get_base_url()}/serien?by=genre" soup = _get_soup_simple(catalog_url) results: List[SeriesResult] = [] for series in parse_series_catalog(soup).values(): for entry in series: haystack = _normalize_search_text(entry.title) if entry.title and normalized_query in haystack: results.append(entry) return results def parse_series_catalog(soup: BeautifulSoupT) -> Dict[str, List[SeriesResult]]: """Parst die Serien-Übersicht (/serien) und liefert Genre -> Serienliste.""" catalog: Dict[str, List[SeriesResult]] = {} # Neues Layout (Stand: 2026-01): Gruppen-Header + Liste. # - Header: `div.background-1 ...` mit `h3` # - Einträge: `ul.series-list` -> `li.series-item[data-search]` -> `a[href]` for header in soup.select("div.background-1 h3"): group = (header.get_text(strip=True) or "").strip() if not group: continue list_node = header.parent.find_next_sibling("ul", class_="series-list") if not list_node: continue series: List[SeriesResult] = [] for item in list_node.select("li.series-item"): anchor = item.find("a", href=True) if not anchor: continue href = (anchor.get("href") or "").strip() url = _absolute_url(href) if url: _log_parsed_url(url) if ("/serie/" not in url) or "/staffel-" in url or "/episode-" in url: continue title = (anchor.get_text(" ", strip=True) or "").strip() description = (item.get("data-search") or "").strip() if title: series.append(SeriesResult(title=title, description=description, url=url)) if series: catalog[group] = series return catalog def _extract_season_links(soup: BeautifulSoupT) -> List[Tuple[int, str]]: season_links: List[Tuple[int, str]] = [] seen_numbers: set[int] = set() anchors = soup.select("ul.nav.list-items-nav a[data-season-pill][href]") for anchor in anchors: href = anchor.get("href") or "" if "/episode-" in href: continue data_number = (anchor.get("data-season-pill") or "").strip() match = re.search(r"/staffel-(\d+)", href) if match: number = int(match.group(1)) elif data_number.isdigit(): number = int(data_number) else: label = anchor.get_text(strip=True) if not label.isdigit(): continue number = int(label) if number in seen_numbers: continue seen_numbers.add(number) season_url = _absolute_url(href) if season_url: _log_parsed_url(season_url) season_links.append((number, season_url)) season_links.sort(key=lambda item: item[0]) return season_links def _extract_number_of_seasons(soup: BeautifulSoupT) -> Optional[int]: tag = soup.select_one('meta[itemprop="numberOfSeasons"]') if not tag: return None content = (tag.get("content") or "").strip() if not content.isdigit(): return None count = int(content) return count if count > 0 else None def _extract_canonical_url(soup: BeautifulSoupT, fallback: str) -> str: canonical = soup.select_one('link[rel="canonical"][href]') href = (canonical.get("href") if canonical else "") or "" href = href.strip() if href.startswith("http://") or href.startswith("https://"): return href.rstrip("/") return fallback.rstrip("/") def _extract_episodes(soup: BeautifulSoupT) -> List[EpisodeInfo]: episodes: List[EpisodeInfo] = [] season_label = "" season_header = soup.select_one("section.episode-section h2") or soup.select_one("h2.h3") if season_header: season_label = (season_header.get_text(" ", strip=True) or "").strip() language_map = { "german": "DE", "english": "EN", "japanese": "JP", "turkish": "TR", "spanish": "ES", "italian": "IT", "french": "FR", "korean": "KO", "russian": "RU", "polish": "PL", "portuguese": "PT", "chinese": "ZH", "arabic": "AR", "thai": "TH", } # Neues Layout (Stand: 2026-01): Episoden-Tabelle mit Zeilen und onclick-URL. rows = soup.select("table.episode-table tbody tr.episode-row") for index, row in enumerate(rows): if _row_is_upcoming(row): continue onclick = (row.get("onclick") or "").strip() url = "" if onclick: match = re.search(r"location=['\\\"]([^'\\\"]+)['\\\"]", onclick) if match: url = _absolute_url(match.group(1)) if not url: anchor = row.find("a", href=True) url = _absolute_url(anchor.get("href")) if anchor else "" if url: _log_parsed_url(url) number_tag = row.select_one(".episode-number-cell") number_text = (number_tag.get_text(strip=True) if number_tag else "").strip() match = re.search(r"/episode-(\d+)", url) if url else None if match: number = int(match.group(1)) else: digits = "".join(ch for ch in number_text if ch.isdigit()) number = int(digits) if digits else index + 1 title_tag = row.select_one(".episode-title-ger") original_tag = row.select_one(".episode-title-eng") title = (title_tag.get_text(strip=True) if title_tag else "").strip() original_title = (original_tag.get_text(strip=True) if original_tag else "").strip() if not title: title = f"Episode {number}" if _is_episode_tba(title, original_title): continue hosters: List[str] = [] for img in row.select(".episode-watch-cell img"): label = (img.get("alt") or img.get("title") or "").strip() if label and label not in hosters: hosters.append(label) languages: List[str] = [] for flag in row.select(".episode-language-cell .watch-language"): classes = flag.get("class") or [] if isinstance(classes, str): classes = classes.split() for cls in classes: if cls.startswith("svg-flag-"): key = cls.replace("svg-flag-", "").strip() if not key: continue value = language_map.get(key, key.upper()) if value and value not in languages: languages.append(value) episodes.append( EpisodeInfo( number=number, title=title, original_title=original_title, url=url, season_label=season_label, languages=languages, hosters=hosters, ) ) if episodes: return episodes return episodes def fetch_episode_stream_link( episode_url: str, *, preferred_hosters: Optional[List[str]] = None, ) -> Optional[str]: _ensure_requests() normalized_url = _absolute_url(episode_url) preferred = [hoster.lower() for hoster in (preferred_hosters or DEFAULT_PREFERRED_HOSTERS)] session = get_requests_session("serienstream", headers=HEADERS) # Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren. try: _get_soup(_get_base_url(), session=session) except Exception: pass soup = _get_soup(normalized_url, session=session) candidates: List[Tuple[str, str]] = [] for button in soup.select("button.link-box[data-play-url]"): play_url = (button.get("data-play-url") or "").strip() provider = (button.get("data-provider-name") or "").strip() url = _absolute_url(play_url) if url: _log_parsed_url(url) if provider and url: candidates.append((provider, url)) if not candidates: return None for preferred_name in preferred: for name, url in candidates: if name.lower() == preferred_name: return url return candidates[0][1] def fetch_episode_hoster_names(episode_url: str) -> List[str]: """Liest die verfügbaren Hoster-Namen für eine Episode aus.""" _ensure_requests() normalized_url = _absolute_url(episode_url) session = get_requests_session("serienstream", headers=HEADERS) # Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren. try: _get_soup(_get_base_url(), session=session) except Exception: pass soup = _get_soup(normalized_url, session=session) names: List[str] = [] seen: set[str] = set() for button in soup.select("button.link-box[data-provider-name]"): name = (button.get("data-provider-name") or "").strip() play_url = (button.get("data-play-url") or "").strip() url = _absolute_url(play_url) if url: _log_parsed_url(url) key = name.casefold().strip() if not key or key in seen: continue seen.add(key) names.append(name) _log_url(name, kind="HOSTER") if names: _log_url(f"{normalized_url}#hosters={','.join(names)}", kind="HOSTERS") return names _LATEST_EPISODE_TAG_RE = re.compile(SEASON_EPISODE_TAG, re.IGNORECASE) _LATEST_EPISODE_URL_RE = re.compile(SEASON_EPISODE_URL, re.IGNORECASE) def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]: """Parst die neuesten Episoden von der Startseite.""" episodes: List[LatestEpisode] = [] seen: set[str] = set() for anchor in soup.select("a.latest-episode-row[href]"): href = (anchor.get("href") or "").strip() if not href or "/serie/" not in href: continue url = _absolute_url(href) if not url: continue title_node = anchor.select_one(".ep-title") series_title = (title_node.get("title") if title_node else "") or "" series_title = series_title.strip() or (title_node.get_text(strip=True) if title_node else "").strip() if not series_title: continue season_text = (anchor.select_one(".ep-season").get_text(strip=True) if anchor.select_one(".ep-season") else "").strip() episode_text = (anchor.select_one(".ep-episode").get_text(strip=True) if anchor.select_one(".ep-episode") else "").strip() season_number: Optional[int] = None episode_number: Optional[int] = None match = re.search(r"S\\s*(\\d+)", season_text, re.IGNORECASE) if match: season_number = int(match.group(1)) match = re.search(r"E\\s*(\\d+)", episode_text, re.IGNORECASE) if match: episode_number = int(match.group(1)) if season_number is None or episode_number is None: match = _LATEST_EPISODE_URL_RE.search(href) if match: season_number = int(match.group(1)) episode_number = int(match.group(2)) if season_number is None or episode_number is None: continue airdate_node = anchor.select_one(".ep-time") airdate = (airdate_node.get_text(" ", strip=True) if airdate_node else "").strip() key = f"{url}\\t{season_number}\\t{episode_number}" if key in seen: continue seen.add(key) _log_parsed_url(url) episodes.append( LatestEpisode( series_title=series_title, season=int(season_number), episode=int(episode_number), url=url, airdate=airdate, ) ) return episodes def resolve_redirect(target_url: str) -> Optional[str]: _ensure_requests() normalized_url = _absolute_url(target_url) _log_visit(normalized_url) session = get_requests_session("serienstream", headers=HEADERS) # Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren. try: _get_soup(_get_base_url(), session=session) except Exception: pass response = session.get( normalized_url, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True, ) if response.url: _log_url(response.url, kind="RESOLVED") return response.url if response.url else None def scrape_series_detail( series_identifier: str, max_seasons: Optional[int] = None, *, load_episodes: bool = True, ) -> List[SeasonInfo]: _ensure_requests() series_url = _series_root_url(_normalize_series_url(series_identifier)) _log_url(series_url, kind="SERIES") _notify_url(series_url) session = get_requests_session("serienstream", headers=HEADERS) soup = _get_soup(series_url, session=session) base_series_url = _series_root_url(_extract_canonical_url(soup, series_url)) season_links = _extract_season_links(soup) season_count = _extract_number_of_seasons(soup) if season_count and (not season_links or len(season_links) < season_count): existing = {number for number, _ in season_links} for number in range(1, season_count + 1): if number in existing: continue season_url = f"{base_series_url}/staffel-{number}" _log_parsed_url(season_url) season_links.append((number, season_url)) season_links.sort(key=lambda item: item[0]) if max_seasons is not None: season_links = season_links[:max_seasons] seasons: List[SeasonInfo] = [] for number, url in season_links: episodes: List[EpisodeInfo] = [] if load_episodes: season_soup = _get_soup(url, session=session) episodes = _extract_episodes(season_soup) seasons.append(SeasonInfo(number=number, url=url, episodes=episodes)) seasons.sort(key=lambda s: s.number) return seasons class SerienstreamPlugin(BasisPlugin): """Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt.""" name = "Serienstream" POPULAR_GENRE_LABEL = "⭐ Beliebte Serien" def __init__(self) -> None: self._series_results: Dict[str, SeriesResult] = {} self._season_cache: Dict[str, List[SeasonInfo]] = {} self._season_links_cache: Dict[str, List[SeasonInfo]] = {} self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {} self._catalog_cache: Optional[Dict[str, List[SeriesResult]]] = None self._popular_cache: Optional[List[SeriesResult]] = None self._requests_available = REQUESTS_AVAILABLE self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._preferred_hosters: List[str] = list(self._default_preferred_hosters) self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {} self._latest_cache: Dict[int, List[LatestEpisode]] = {} self._latest_hoster_cache: Dict[str, List[str]] = {} self.is_available = True self.unavailable_reason: Optional[str] = None if not self._requests_available: # pragma: no cover - optional dependency self.is_available = False self.unavailable_reason = ( "requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'." ) print( "SerienstreamPlugin deaktiviert: requests/bs4 fehlen. " "Installiere 'requests' und 'beautifulsoup4'." ) if REQUESTS_IMPORT_ERROR: print(f"Importfehler: {REQUESTS_IMPORT_ERROR}") return def _ensure_catalog(self) -> Dict[str, List[SeriesResult]]: if self._catalog_cache is not None: return self._catalog_cache # Stand: 2026-01 liefert `?by=genre` konsistente Gruppen für `genres()`. catalog_url = f"{_get_base_url()}/serien?by=genre" soup = _get_soup_simple(catalog_url) self._catalog_cache = parse_series_catalog(soup) return self._catalog_cache def genres(self) -> List[str]: """Optional: Liefert alle Genres aus dem Serien-Katalog.""" if not self._requests_available: return [] catalog = self._ensure_catalog() return sorted(catalog.keys(), key=str.casefold) def capabilities(self) -> set[str]: """Meldet unterstützte Features für Router-Menüs.""" return {"popular_series", "genres", "latest_episodes"} def popular_series(self) -> List[str]: """Liefert die Titel der beliebten Serien (Quelle: `/beliebte-serien`).""" if not self._requests_available: return [] entries = self._ensure_popular() self._series_results.update({entry.title: entry for entry in entries if entry.title}) return [entry.title for entry in entries if entry.title] def titles_for_genre(self, genre: str) -> List[str]: """Optional: Liefert Titel für ein Genre.""" if not self._requests_available: return [] genre = (genre or "").strip() if not genre: return [] if genre == self.POPULAR_GENRE_LABEL: return self.popular_series() catalog = self._ensure_catalog() entries = catalog.get(genre, []) self._series_results.update({entry.title: entry for entry in entries if entry.title}) return [entry.title for entry in entries if entry.title] def _ensure_popular(self) -> List[SeriesResult]: """Laedt und cached die Liste der beliebten Serien aus `/beliebte-serien`.""" if self._popular_cache is not None: return list(self._popular_cache) soup = _get_soup_simple(_popular_series_url()) results: List[SeriesResult] = [] seen: set[str] = set() # Neues Layout (Stand: 2026-01): Abschnitt "Meistgesehen" hat Karten mit # `a.show-card` und Titel im `img alt=...`. anchors = None for section in soup.select("div.mb-5"): h2 = section.select_one("h2") label = (h2.get_text(" ", strip=True) if h2 else "").casefold() if "meistgesehen" in label: anchors = section.select("a.show-card[href]") break if anchors is None: anchors = soup.select("a.show-card[href]") for anchor in anchors: href = (anchor.get("href") or "").strip() if not href or "/serie/" not in href: continue img = anchor.select_one("img[alt]") title = ((img.get("alt") if img else "") or "").strip() if not title or title in seen: continue url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/") url = re.sub(r"/staffel-\\d+(?:/.*)?$", "", url).rstrip("/") if not url: continue _log_parsed_url(url) seen.add(title) results.append(SeriesResult(title=title, description="", url=url)) self._popular_cache = list(results) return list(results) @staticmethod def _season_label(number: int) -> str: return f"Staffel {number}" @staticmethod def _episode_label(info: EpisodeInfo) -> str: suffix_parts: List[str] = [] if info.original_title: suffix_parts.append(info.original_title) # Staffel nicht im Episoden-Label anzeigen (wird im UI bereits gesetzt). suffix = f" ({' | '.join(suffix_parts)})" if suffix_parts else "" return f"Episode {info.number}: {info.title}{suffix}" @staticmethod def _parse_season_number(label: str) -> Optional[int]: digits = "".join(ch for ch in label if ch.isdigit()) if not digits: return None return int(digits) def _clear_episode_cache_for_title(self, title: str) -> None: keys_to_remove = [key for key in self._episode_label_cache if key[0] == title] for key in keys_to_remove: self._episode_label_cache.pop(key, None) keys_to_remove = [key for key in self._hoster_cache if key[0] == title] for key in keys_to_remove: self._hoster_cache.pop(key, None) def _cache_episode_labels(self, title: str, season_label: str, season_info: SeasonInfo) -> None: cache_key = (title, season_label) self._episode_label_cache[cache_key] = { self._episode_label(info): info for info in season_info.episodes } def _ensure_season_links(self, title: str) -> List[SeasonInfo]: cached = self._season_links_cache.get(title) if cached is not None: return list(cached) series = self._series_results.get(title) if not series: return [] try: seasons = scrape_series_detail(series.url, load_episodes=False) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc self._season_links_cache[title] = list(seasons) return list(seasons) def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]: seasons = self._season_cache.get(title) or [] for season in seasons: if season.number == season_number and season.episodes: return season links = self._ensure_season_links(title) target = next((season for season in links if season.number == season_number), None) if not target: return None try: season_soup = _get_soup(target.url, session=get_requests_session("serienstream", headers=HEADERS)) season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup)) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Serienstream-Episoden konnten nicht geladen werden: {exc}") from exc updated = [season for season in seasons if season.number != season_number] updated.append(season_info) updated.sort(key=lambda item: item.number) self._season_cache[title] = updated return season_info def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]: cache_key = (title, season_label) cached = self._episode_label_cache.get(cache_key) if cached: return cached.get(episode_label) number = self._parse_season_number(season_label) if number is None: return None season_info = self._ensure_season_episodes(title, number) if season_info: self._cache_episode_labels(title, season_label, season_info) return self._episode_label_cache.get(cache_key, {}).get(episode_label) return None async def search_titles(self, query: str) -> List[str]: query = query.strip() if not query: self._series_results.clear() self._season_cache.clear() self._season_links_cache.clear() self._episode_label_cache.clear() self._catalog_cache = None return [] if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 nicht suchen.") try: # Nutzt den Katalog (/serien), der jetzt nach Genres gruppiert ist. # Alternativ gäbe es ein Ajax-Endpoint, aber der ist nicht immer zuverlässig erreichbar. results = search_series(query) except Exception as exc: # pragma: no cover - defensive logging self._series_results.clear() self._season_cache.clear() self._episode_label_cache.clear() self._catalog_cache = None raise RuntimeError(f"Serienstream-Suche fehlgeschlagen: {exc}") from exc self._series_results = {result.title: result for result in results} self._season_cache.clear() self._season_links_cache.clear() self._episode_label_cache.clear() return [result.title for result in results] def _ensure_seasons(self, title: str) -> List[SeasonInfo]: if title in self._season_cache: seasons = self._season_cache[title] # Auch bei Cache-Treffern die URLs loggen, damit nachvollziehbar bleibt, # welche Seiten für Staffel-/Episodenlisten relevant sind. if _get_setting_bool(GLOBAL_SETTING_LOG_URLS, default=False): series = self._series_results.get(title) if series and series.url: _log_url(series.url, kind="CACHE") for season in seasons: if season.url: _log_url(season.url, kind="CACHE") return seasons series = self._series_results.get(title) if not series: # Kodi startet das Plugin pro Navigation neu -> Such-Cache im RAM geht verloren. # Daher den Titel erneut im Katalog auflösen, um die Serien-URL zu bekommen. catalog = self._ensure_catalog() lookup_key = title.casefold().strip() for entries in catalog.values(): for entry in entries: if entry.title.casefold().strip() == lookup_key: series = entry self._series_results[entry.title] = entry break if series: break if not series: return [] seasons = self._ensure_season_links(title) self._clear_episode_cache_for_title(title) self._season_cache[title] = list(seasons) return list(seasons) def seasons_for(self, title: str) -> List[str]: seasons = self._ensure_seasons(title) return [self._season_label(season.number) for season in seasons] def episodes_for(self, title: str, season: str) -> List[str]: number = self._parse_season_number(season) if number is None: return [] season_info = self._ensure_season_episodes(title, number) if season_info: labels = [self._episode_label(info) for info in season_info.episodes] self._cache_episode_labels(title, season, season_info) return labels return [] def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.") episode_info = self._lookup_episode(title, season, episode) if not episode_info: return None try: link = fetch_episode_stream_link( episode_info.url, preferred_hosters=self._preferred_hosters, ) if link: _log_url(link, kind="FOUND") return link except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.") cache_key = (title, season, episode) cached = self._hoster_cache.get(cache_key) if cached is not None: return list(cached) episode_info = self._lookup_episode(title, season, episode) if not episode_info: return [] try: names = fetch_episode_hoster_names(episode_info.url) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc self._hoster_cache[cache_key] = list(names) return list(names) def latest_episodes(self, page: int = 1) -> List[LatestEpisode]: """Liefert die neuesten Episoden aus `/neue-episoden`.""" if not self._requests_available: return [] try: page = int(page or 1) except Exception: page = 1 page = max(1, page) cached = self._latest_cache.get(page) if cached is not None: return list(cached) url = _latest_episodes_url() if page > 1: url = f"{url}?page={page}" soup = _get_soup_simple(url) episodes = _extract_latest_episodes(soup) self._latest_cache[page] = list(episodes) return list(episodes) def available_hosters_for_url(self, episode_url: str) -> List[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.") normalized = _absolute_url(episode_url) cached = self._latest_hoster_cache.get(normalized) if cached is not None: return list(cached) try: names = fetch_episode_hoster_names(normalized) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc self._latest_hoster_cache[normalized] = list(names) return list(names) def stream_link_for_url(self, episode_url: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.") normalized = _absolute_url(episode_url) try: link = fetch_episode_stream_link( normalized, preferred_hosters=self._preferred_hosters, ) if link: _log_url(link, kind="FOUND") return link except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc def resolve_stream_link(self, link: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links aufloesen.") try: resolved = resolve_redirect(link) if not resolved: return None try: from resolveurl_backend import resolve as resolve_with_resolveurl except Exception: resolve_with_resolveurl = None if callable(resolve_with_resolveurl): resolved_by_resolveurl = resolve_with_resolveurl(resolved) if resolved_by_resolveurl: _log_url("ResolveURL", kind="HOSTER_RESOLVER") _log_url(resolved_by_resolveurl, kind="MEDIA") return resolved_by_resolveurl _log_url(resolved, kind="FINAL") return resolved except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Stream-Link konnte nicht verfolgt werden: {exc}") from exc def set_preferred_hosters(self, hosters: List[str]) -> None: normalized = [hoster.strip().lower() for hoster in hosters if hoster.strip()] if normalized: self._preferred_hosters = normalized def reset_preferred_hosters(self) -> None: self._preferred_hosters = list(self._default_preferred_hosters) # Alias für die automatische Plugin-Erkennung. Plugin = SerienstreamPlugin