"""Serienstream (s.to) Integration als Downloader-Plugin. Hinweise: - Diese Integration nutzt optional `requests` + `beautifulsoup4` (bs4). - In Kodi koennen zusaetzliche Debug-Funktionen ueber Addon-Settings aktiviert werden (URL-Logging, HTML-Dumps, Benachrichtigungen). """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from html import unescape import json import hashlib import os import re import time import unicodedata from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias try: # pragma: no cover - optional dependency import requests from bs4 import BeautifulSoup # type: ignore[import-not-found] except ImportError as exc: # pragma: no cover - optional dependency requests = None BeautifulSoup = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None try: # pragma: no cover - optional Kodi helpers import xbmcaddon # type: ignore[import-not-found] import xbmcvfs # type: ignore[import-not-found] import xbmcgui # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow running outside Kodi xbmcaddon = None xbmcvfs = None xbmcgui = None from plugin_interface import BasisPlugin from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url from http_session_pool import get_requests_session from regex_patterns import SEASON_EPISODE_TAG, SEASON_EPISODE_URL if TYPE_CHECKING: # pragma: no cover from requests import Session as RequestsSession from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found] else: # pragma: no cover RequestsSession: TypeAlias = Any BeautifulSoupT: TypeAlias = Any SETTING_BASE_URL = "serienstream_base_url" DEFAULT_BASE_URL = "https://s.to" DEFAULT_PREFERRED_HOSTERS = ["voe"] DEFAULT_TIMEOUT = 20 ADDON_ID = "plugin.video.viewit" GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info" GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors" SETTING_LOG_URLS = "log_urls_serienstream" SETTING_DUMP_HTML = "dump_html_serienstream" SETTING_SHOW_URL_INFO = "show_url_info_serienstream" SETTING_LOG_ERRORS = "log_errors_serienstream" HEADERS = { "User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Connection": "keep-alive", } SESSION_CACHE_TTL_SECONDS = 300 SESSION_CACHE_PREFIX = "viewit.serienstream" SESSION_CACHE_MAX_TITLE_URLS = 800 @dataclass class SeriesResult: title: str description: str url: str @dataclass class EpisodeInfo: number: int title: str original_title: str url: str season_label: str = "" languages: List[str] = field(default_factory=list) hosters: List[str] = field(default_factory=list) @dataclass class LatestEpisode: series_title: str season: int episode: int url: str airdate: str @dataclass class SeasonInfo: number: int url: str episodes: List[EpisodeInfo] def _get_base_url() -> str: base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip() if not base: base = DEFAULT_BASE_URL return base.rstrip("/") def _series_base_url() -> str: return f"{_get_base_url()}/serie/stream" def _popular_series_url() -> str: return f"{_get_base_url()}/beliebte-serien" def _latest_episodes_url() -> str: return f"{_get_base_url()}" def _absolute_url(href: str) -> str: return f"{_get_base_url()}{href}" if href.startswith("/") else href def _session_window() -> Any: if xbmcgui is None: return None try: return xbmcgui.Window(10000) except Exception: return None def _session_cache_key(name: str) -> str: base_hash = hashlib.sha1(_get_base_url().encode("utf-8")).hexdigest()[:12] return f"{SESSION_CACHE_PREFIX}.{base_hash}.{name}" def _session_cache_get(name: str) -> Any: window = _session_window() if window is None: return None raw = "" try: raw = window.getProperty(_session_cache_key(name)) or "" except Exception: return None if not raw: return None try: payload = json.loads(raw) except Exception: return None if not isinstance(payload, dict): return None expires_at = payload.get("expires_at") data = payload.get("data") try: if float(expires_at or 0) <= time.time(): return None except Exception: return None return data def _session_cache_set(name: str, data: Any, *, ttl_seconds: int = SESSION_CACHE_TTL_SECONDS) -> None: window = _session_window() if window is None: return payload = { "expires_at": float(time.time() + max(1, int(ttl_seconds))), "data": data, } try: raw = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) except Exception: return # Kodi-Properties sind kein Dauer-Storage; begrenzen, damit UI stabil bleibt. if len(raw) > 240_000: return try: window.setProperty(_session_cache_key(name), raw) except Exception: return def _normalize_series_url(identifier: str) -> str: if identifier.startswith("http://") or identifier.startswith("https://"): return identifier.rstrip("/") slug = identifier.strip("/") return f"{_series_base_url()}/{slug}" def _series_root_url(url: str) -> str: """Normalisiert eine Serien-URL auf die Root-URL (ohne /staffel-x oder /episode-x).""" normalized = (url or "").strip().rstrip("/") normalized = re.sub(r"/staffel-\d+(?:/.*)?$", "", normalized) normalized = re.sub(r"/episode-\d+(?:/.*)?$", "", normalized) return normalized.rstrip("/") def _log_visit(url: str) -> None: _log_url(url, kind="VISIT") _notify_url(url) if xbmcaddon is None: print(f"Visiting: {url}") def _normalize_text(value: str) -> str: """Legacy normalization (kept for backwards compatibility).""" value = value.casefold() value = re.sub(r"[^a-z0-9]+", "", value) return value def _normalize_search_text(value: str) -> str: """Normalisiert Text für die Suche ohne Wortgrenzen zu "verschmelzen". Wichtig: Wir ersetzen Nicht-Alphanumerisches durch Leerzeichen, statt es zu entfernen. Dadurch entstehen keine künstlichen Treffer über Wortgrenzen hinweg (z.B. "an" + "na" -> "anna"). """ value = (value or "").casefold() value = re.sub(r"[^a-z0-9]+", " ", value) value = re.sub(r"\s+", " ", value).strip() return value def _matches_query(query: str, *, title: str) -> bool: normalized_query = _normalize_search_text(query) if not normalized_query: return False haystack = f" {_normalize_search_text(title)} " return f" {normalized_query} " in haystack def _is_episode_tba(title: str, original_title: str) -> bool: combined = f"{title} {original_title}".casefold() markers = ("tba", "demnächst", "demnaechst", "coming soon", "to be announced") return any(marker in combined for marker in markers) def _row_is_upcoming(row: BeautifulSoupT) -> bool: classes = row.get("class") or [] if isinstance(classes, str): classes = classes.split() if "upcoming" in classes: return True badge = row.select_one(".badge-upcoming") if badge and (badge.get_text(" ", strip=True) or "").strip(): return True watch_cell = row.select_one(".episode-watch-cell") if watch_cell: text = watch_cell.get_text(" ", strip=True).casefold() if "tba" in text: return True return False def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool: return get_setting_bool(ADDON_ID, setting_id, default=default) def _notify_url(url: str) -> None: notify_url( ADDON_ID, heading="Serienstream", url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO, plugin_setting_id=SETTING_SHOW_URL_INFO, ) def _log_url(url: str, *, kind: str = "VISIT") -> None: log_url( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, plugin_setting_id=SETTING_LOG_URLS, log_filename="serienstream_urls.log", url=url, kind=kind, ) def _log_parsed_url(url: str) -> None: _log_url(url, kind="PARSE") def _log_response_html(url: str, body: str) -> None: dump_response_html( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_DUMP_HTML, plugin_setting_id=SETTING_DUMP_HTML, url=url, body=body, filename_prefix="s_to_response", ) def _log_error(message: str) -> None: log_error( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS, plugin_setting_id=SETTING_LOG_ERRORS, log_filename="serienstream_errors.log", message=message, ) def _ensure_requests() -> None: if requests is None or BeautifulSoup is None: raise RuntimeError("requests/bs4 sind nicht verfuegbar.") def _looks_like_cloudflare_challenge(body: str) -> bool: lower = body.lower() markers = ( "cf-browser-verification", "cf-challenge", "cf_chl", "challenge-platform", "attention required! | cloudflare", "just a moment...", "cloudflare ray id", ) return any(marker in lower for marker in markers) def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT: _ensure_requests() _log_visit(url) sess = session or get_requests_session("serienstream", headers=HEADERS) try: response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() except Exception as exc: _log_error(f"GET {url} failed: {exc}") raise if response.url and response.url != url: _log_url(response.url, kind="REDIRECT") _log_response_html(url, response.text) if _looks_like_cloudflare_challenge(response.text): raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") return BeautifulSoup(response.text, "html.parser") def _get_html_simple(url: str) -> str: _ensure_requests() _log_visit(url) sess = get_requests_session("serienstream", headers=HEADERS) try: response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() except Exception as exc: _log_error(f"GET {url} failed: {exc}") raise if response.url and response.url != url: _log_url(response.url, kind="REDIRECT") body = response.text _log_response_html(url, body) if _looks_like_cloudflare_challenge(body): raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") return body def _get_soup_simple(url: str) -> BeautifulSoupT: body = _get_html_simple(url) return BeautifulSoup(body, "html.parser") def _extract_genre_names_from_html(body: str) -> List[str]: names: List[str] = [] seen: set[str] = set() pattern = re.compile( r"]*class=[\"'][^\"']*background-1[^\"']*[\"'][^>]*>.*?]*>(.*?)", re.IGNORECASE | re.DOTALL, ) for match in pattern.finditer(body or ""): text = re.sub(r"<[^>]+>", " ", match.group(1) or "") text = unescape(re.sub(r"\s+", " ", text)).strip() if not text: continue key = text.casefold() if key in seen: continue seen.add(key) names.append(text) return names def search_series(query: str) -> List[SeriesResult]: """Sucht Serien im (/serien)-Katalog (Genre-liste) nach Titel/Alt-Titel.""" _ensure_requests() if not _normalize_search_text(query): return [] # Direkter Abruf wie in fetch_serien.py. catalog_url = f"{_get_base_url()}/serien?by=genre" soup = _get_soup_simple(catalog_url) results: List[SeriesResult] = [] for series in parse_series_catalog(soup).values(): for entry in series: if entry.title and _matches_query(query, title=entry.title): results.append(entry) return results def parse_series_catalog(soup: BeautifulSoupT) -> Dict[str, List[SeriesResult]]: """Parst die Serien-Übersicht (/serien) und liefert Genre -> Serienliste.""" catalog: Dict[str, List[SeriesResult]] = {} # Neues Layout (Stand: 2026-01): Gruppen-Header + Liste. # - Header: `div.background-1 ...` mit `h3` # - Einträge: `ul.series-list` -> `li.series-item[data-search]` -> `a[href]` for header in soup.select("div.background-1 h3"): group = (header.get_text(strip=True) or "").strip() if not group: continue list_node = header.parent.find_next_sibling("ul", class_="series-list") if not list_node: continue series: List[SeriesResult] = [] for item in list_node.select("li.series-item"): anchor = item.find("a", href=True) if not anchor: continue href = (anchor.get("href") or "").strip() url = _absolute_url(href) if url: _log_parsed_url(url) if ("/serie/" not in url) or "/staffel-" in url or "/episode-" in url: continue title = (anchor.get_text(" ", strip=True) or "").strip() description = (item.get("data-search") or "").strip() if title: series.append(SeriesResult(title=title, description=description, url=url)) if series: catalog[group] = series return catalog def _extract_season_links(soup: BeautifulSoupT) -> List[Tuple[int, str]]: season_links: List[Tuple[int, str]] = [] seen_numbers: set[int] = set() anchors = soup.select("ul.nav.list-items-nav a[data-season-pill][href]") for anchor in anchors: href = anchor.get("href") or "" if "/episode-" in href: continue data_number = (anchor.get("data-season-pill") or "").strip() match = re.search(r"/staffel-(\d+)", href) if match: number = int(match.group(1)) elif data_number.isdigit(): number = int(data_number) else: label = anchor.get_text(strip=True) if not label.isdigit(): continue number = int(label) if number in seen_numbers: continue seen_numbers.add(number) season_url = _absolute_url(href) if season_url: _log_parsed_url(season_url) season_links.append((number, season_url)) season_links.sort(key=lambda item: item[0]) return season_links def _extract_number_of_seasons(soup: BeautifulSoupT) -> Optional[int]: tag = soup.select_one('meta[itemprop="numberOfSeasons"]') if not tag: return None content = (tag.get("content") or "").strip() if not content.isdigit(): return None count = int(content) return count if count > 0 else None def _extract_canonical_url(soup: BeautifulSoupT, fallback: str) -> str: canonical = soup.select_one('link[rel="canonical"][href]') href = (canonical.get("href") if canonical else "") or "" href = href.strip() if href.startswith("http://") or href.startswith("https://"): return href.rstrip("/") return fallback.rstrip("/") def _extract_episodes(soup: BeautifulSoupT) -> List[EpisodeInfo]: episodes: List[EpisodeInfo] = [] season_label = "" season_header = soup.select_one("section.episode-section h2") or soup.select_one("h2.h3") if season_header: season_label = (season_header.get_text(" ", strip=True) or "").strip() language_map = { "german": "DE", "english": "EN", "japanese": "JP", "turkish": "TR", "spanish": "ES", "italian": "IT", "french": "FR", "korean": "KO", "russian": "RU", "polish": "PL", "portuguese": "PT", "chinese": "ZH", "arabic": "AR", "thai": "TH", } # Neues Layout (Stand: 2026-01): Episoden-Tabelle mit Zeilen und onclick-URL. rows = soup.select("table.episode-table tbody tr.episode-row") for index, row in enumerate(rows): if _row_is_upcoming(row): continue onclick = (row.get("onclick") or "").strip() url = "" if onclick: match = re.search(r"location=['\\\"]([^'\\\"]+)['\\\"]", onclick) if match: url = _absolute_url(match.group(1)) if not url: anchor = row.find("a", href=True) url = _absolute_url(anchor.get("href")) if anchor else "" if url: _log_parsed_url(url) number_tag = row.select_one(".episode-number-cell") number_text = (number_tag.get_text(strip=True) if number_tag else "").strip() match = re.search(r"/episode-(\d+)", url) if url else None if match: number = int(match.group(1)) else: digits = "".join(ch for ch in number_text if ch.isdigit()) number = int(digits) if digits else index + 1 title_tag = row.select_one(".episode-title-ger") original_tag = row.select_one(".episode-title-eng") title = (title_tag.get_text(strip=True) if title_tag else "").strip() original_title = (original_tag.get_text(strip=True) if original_tag else "").strip() if not title: title = f"Episode {number}" if _is_episode_tba(title, original_title): continue hosters: List[str] = [] for img in row.select(".episode-watch-cell img"): label = (img.get("alt") or img.get("title") or "").strip() if label and label not in hosters: hosters.append(label) languages: List[str] = [] for flag in row.select(".episode-language-cell .watch-language"): classes = flag.get("class") or [] if isinstance(classes, str): classes = classes.split() for cls in classes: if cls.startswith("svg-flag-"): key = cls.replace("svg-flag-", "").strip() if not key: continue value = language_map.get(key, key.upper()) if value and value not in languages: languages.append(value) episodes.append( EpisodeInfo( number=number, title=title, original_title=original_title, url=url, season_label=season_label, languages=languages, hosters=hosters, ) ) if episodes: return episodes return episodes def fetch_episode_stream_link( episode_url: str, *, preferred_hosters: Optional[List[str]] = None, ) -> Optional[str]: _ensure_requests() normalized_url = _absolute_url(episode_url) preferred = [hoster.lower() for hoster in (preferred_hosters or DEFAULT_PREFERRED_HOSTERS)] session = get_requests_session("serienstream", headers=HEADERS) # Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren. try: _get_soup(_get_base_url(), session=session) except Exception: pass soup = _get_soup(normalized_url, session=session) candidates: List[Tuple[str, str]] = [] for button in soup.select("button.link-box[data-play-url]"): play_url = (button.get("data-play-url") or "").strip() provider = (button.get("data-provider-name") or "").strip() url = _absolute_url(play_url) if url: _log_parsed_url(url) if provider and url: candidates.append((provider, url)) if not candidates: return None for preferred_name in preferred: for name, url in candidates: if name.lower() == preferred_name: return url return candidates[0][1] def fetch_episode_hoster_names(episode_url: str) -> List[str]: """Liest die verfügbaren Hoster-Namen für eine Episode aus.""" _ensure_requests() normalized_url = _absolute_url(episode_url) session = get_requests_session("serienstream", headers=HEADERS) # Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren. try: _get_soup(_get_base_url(), session=session) except Exception: pass soup = _get_soup(normalized_url, session=session) names: List[str] = [] seen: set[str] = set() for button in soup.select("button.link-box[data-provider-name]"): name = (button.get("data-provider-name") or "").strip() play_url = (button.get("data-play-url") or "").strip() url = _absolute_url(play_url) if url: _log_parsed_url(url) key = name.casefold().strip() if not key or key in seen: continue seen.add(key) names.append(name) _log_url(name, kind="HOSTER") if names: _log_url(f"{normalized_url}#hosters={','.join(names)}", kind="HOSTERS") return names _LATEST_EPISODE_TAG_RE = re.compile(SEASON_EPISODE_TAG, re.IGNORECASE) _LATEST_EPISODE_URL_RE = re.compile(SEASON_EPISODE_URL, re.IGNORECASE) def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]: """Parst die neuesten Episoden von der Startseite.""" episodes: List[LatestEpisode] = [] seen: set[str] = set() for anchor in soup.select("a.latest-episode-row[href]"): href = (anchor.get("href") or "").strip() if not href or "/serie/" not in href: continue url = _absolute_url(href) if not url: continue title_node = anchor.select_one(".ep-title") series_title = (title_node.get("title") if title_node else "") or "" series_title = series_title.strip() or (title_node.get_text(strip=True) if title_node else "").strip() if not series_title: continue season_text = (anchor.select_one(".ep-season").get_text(strip=True) if anchor.select_one(".ep-season") else "").strip() episode_text = (anchor.select_one(".ep-episode").get_text(strip=True) if anchor.select_one(".ep-episode") else "").strip() season_number: Optional[int] = None episode_number: Optional[int] = None match = re.search(r"S\s*(\d+)", season_text, re.IGNORECASE) if match: season_number = int(match.group(1)) match = re.search(r"E\s*(\d+)", episode_text, re.IGNORECASE) if match: episode_number = int(match.group(1)) if season_number is None or episode_number is None: match = _LATEST_EPISODE_URL_RE.search(href) if match: season_number = int(match.group(1)) episode_number = int(match.group(2)) if season_number is None or episode_number is None: continue airdate_node = anchor.select_one(".ep-time") airdate = (airdate_node.get_text(" ", strip=True) if airdate_node else "").strip() key = f"{url}\\t{season_number}\\t{episode_number}" if key in seen: continue seen.add(key) _log_parsed_url(url) episodes.append( LatestEpisode( series_title=series_title, season=int(season_number), episode=int(episode_number), url=url, airdate=airdate, ) ) return episodes def resolve_redirect(target_url: str) -> Optional[str]: _ensure_requests() normalized_url = _absolute_url(target_url) _log_visit(normalized_url) session = get_requests_session("serienstream", headers=HEADERS) # Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren. try: _get_soup(_get_base_url(), session=session) except Exception: pass response = session.get( normalized_url, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True, ) if response.url: _log_url(response.url, kind="RESOLVED") return response.url if response.url else None def scrape_series_detail( series_identifier: str, max_seasons: Optional[int] = None, *, load_episodes: bool = True, ) -> List[SeasonInfo]: _ensure_requests() series_url = _series_root_url(_normalize_series_url(series_identifier)) _log_url(series_url, kind="SERIES") _notify_url(series_url) session = get_requests_session("serienstream", headers=HEADERS) soup = _get_soup(series_url, session=session) base_series_url = _series_root_url(_extract_canonical_url(soup, series_url)) season_links = _extract_season_links(soup) season_count = _extract_number_of_seasons(soup) if season_count and (not season_links or len(season_links) < season_count): existing = {number for number, _ in season_links} for number in range(1, season_count + 1): if number in existing: continue season_url = f"{base_series_url}/staffel-{number}" _log_parsed_url(season_url) season_links.append((number, season_url)) season_links.sort(key=lambda item: item[0]) if max_seasons is not None: season_links = season_links[:max_seasons] seasons: List[SeasonInfo] = [] for number, url in season_links: episodes: List[EpisodeInfo] = [] if load_episodes: season_soup = _get_soup(url, session=session) episodes = _extract_episodes(season_soup) seasons.append(SeasonInfo(number=number, url=url, episodes=episodes)) seasons.sort(key=lambda s: s.number) return seasons class SerienstreamPlugin(BasisPlugin): """Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt.""" name = "Serienstream" POPULAR_GENRE_LABEL = "⭐ Beliebte Serien" def __init__(self) -> None: self._series_results: Dict[str, SeriesResult] = {} self._title_url_cache: Dict[str, str] = self._load_title_url_cache() self._genre_names_cache: Optional[List[str]] = None self._season_cache: Dict[str, List[SeasonInfo]] = {} self._season_links_cache: Dict[str, List[SeasonInfo]] = {} self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {} self._catalog_cache: Optional[Dict[str, List[SeriesResult]]] = None self._genre_group_cache: Dict[str, Dict[str, List[str]]] = {} self._genre_page_titles_cache: Dict[Tuple[str, int], List[str]] = {} self._genre_page_count_cache: Dict[str, int] = {} self._popular_cache: Optional[List[SeriesResult]] = None self._requests_available = REQUESTS_AVAILABLE self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._preferred_hosters: List[str] = list(self._default_preferred_hosters) self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {} self._latest_cache: Dict[int, List[LatestEpisode]] = {} self._latest_hoster_cache: Dict[str, List[str]] = {} self.is_available = True self.unavailable_reason: Optional[str] = None if not self._requests_available: # pragma: no cover - optional dependency self.is_available = False self.unavailable_reason = ( "requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'." ) print( "SerienstreamPlugin deaktiviert: requests/bs4 fehlen. " "Installiere 'requests' und 'beautifulsoup4'." ) if REQUESTS_IMPORT_ERROR: print(f"Importfehler: {REQUESTS_IMPORT_ERROR}") return def _load_title_url_cache(self) -> Dict[str, str]: raw = _session_cache_get("title_urls") if not isinstance(raw, dict): return {} result: Dict[str, str] = {} for key, value in raw.items(): key_text = str(key or "").strip().casefold() url_text = str(value or "").strip() if not key_text or not url_text: continue result[key_text] = url_text return result def _save_title_url_cache(self) -> None: if not self._title_url_cache: return # Begrenzt die Session-Daten auf die jüngsten Einträge. while len(self._title_url_cache) > SESSION_CACHE_MAX_TITLE_URLS: self._title_url_cache.pop(next(iter(self._title_url_cache))) _session_cache_set("title_urls", self._title_url_cache) def _remember_series_result(self, title: str, url: str, description: str = "") -> None: title = (title or "").strip() url = (url or "").strip() if not title: return if url: self._series_results[title] = SeriesResult(title=title, description=description, url=url) cache_key = title.casefold() if self._title_url_cache.get(cache_key) != url: self._title_url_cache[cache_key] = url self._save_title_url_cache() return current = self._series_results.get(title) if current is None: self._series_results[title] = SeriesResult(title=title, description=description, url="") @staticmethod def _season_links_cache_name(series_url: str) -> str: digest = hashlib.sha1((series_url or "").encode("utf-8")).hexdigest()[:20] return f"season_links.{digest}" @staticmethod def _season_episodes_cache_name(season_url: str) -> str: digest = hashlib.sha1((season_url or "").encode("utf-8")).hexdigest()[:20] return f"season_episodes.{digest}" def _load_session_season_links(self, series_url: str) -> Optional[List[SeasonInfo]]: raw = _session_cache_get(self._season_links_cache_name(series_url)) if not isinstance(raw, list): return None seasons: List[SeasonInfo] = [] for item in raw: if not isinstance(item, dict): continue try: number = int(item.get("number")) except Exception: continue url = str(item.get("url") or "").strip() if number <= 0 or not url: continue seasons.append(SeasonInfo(number=number, url=url, episodes=[])) if not seasons: return None seasons.sort(key=lambda s: s.number) return seasons def _save_session_season_links(self, series_url: str, seasons: List[SeasonInfo]) -> None: payload = [{"number": int(season.number), "url": season.url} for season in seasons if season.url] if payload: _session_cache_set(self._season_links_cache_name(series_url), payload) def _load_session_season_episodes(self, season_url: str) -> Optional[List[EpisodeInfo]]: raw = _session_cache_get(self._season_episodes_cache_name(season_url)) if not isinstance(raw, list): return None episodes: List[EpisodeInfo] = [] for item in raw: if not isinstance(item, dict): continue try: number = int(item.get("number")) except Exception: continue title = str(item.get("title") or "").strip() original_title = str(item.get("original_title") or "").strip() url = str(item.get("url") or "").strip() season_label = str(item.get("season_label") or "").strip() languages = [str(lang).strip() for lang in list(item.get("languages") or []) if str(lang).strip()] hosters = [str(host).strip() for host in list(item.get("hosters") or []) if str(host).strip()] if number <= 0: continue episodes.append( EpisodeInfo( number=number, title=title or f"Episode {number}", original_title=original_title, url=url, season_label=season_label, languages=languages, hosters=hosters, ) ) if not episodes: return None episodes.sort(key=lambda item: item.number) return episodes def _save_session_season_episodes(self, season_url: str, episodes: List[EpisodeInfo]) -> None: payload = [] for item in episodes: payload.append( { "number": int(item.number), "title": item.title, "original_title": item.original_title, "url": item.url, "season_label": item.season_label, "languages": list(item.languages or []), "hosters": list(item.hosters or []), } ) if payload: _session_cache_set(self._season_episodes_cache_name(season_url), payload) def _ensure_catalog(self) -> Dict[str, List[SeriesResult]]: if self._catalog_cache is not None: return self._catalog_cache # Stand: 2026-01 liefert `?by=genre` konsistente Gruppen für `genres()`. catalog_url = f"{_get_base_url()}/serien?by=genre" soup = _get_soup_simple(catalog_url) self._catalog_cache = parse_series_catalog(soup) _session_cache_set("genres", sorted(self._catalog_cache.keys(), key=str.casefold)) return self._catalog_cache def _ensure_genre_names(self) -> List[str]: if self._genre_names_cache is not None: return list(self._genre_names_cache) cached = _session_cache_get("genres") if isinstance(cached, list): genres = [str(value).strip() for value in cached if str(value).strip()] if genres: self._genre_names_cache = sorted(set(genres), key=str.casefold) return list(self._genre_names_cache) catalog_url = f"{_get_base_url()}/serien?by=genre" try: body = _get_html_simple(catalog_url) genres = _extract_genre_names_from_html(body) except Exception: genres = [] if not genres: catalog = self._ensure_catalog() genres = sorted(catalog.keys(), key=str.casefold) else: genres = sorted(set(genres), key=str.casefold) self._genre_names_cache = list(genres) _session_cache_set("genres", self._genre_names_cache) return list(self._genre_names_cache) def genres(self) -> List[str]: """Optional: Liefert alle Genres aus dem Serien-Katalog.""" if not self._requests_available: return [] return self._ensure_genre_names() def capabilities(self) -> set[str]: """Meldet unterstützte Features für Router-Menüs.""" return {"popular_series", "genres", "latest_episodes"} def popular_series(self) -> List[str]: """Liefert die Titel der beliebten Serien (Quelle: `/beliebte-serien`).""" if not self._requests_available: return [] entries = self._ensure_popular() for entry in entries: self._remember_series_result(entry.title, entry.url, entry.description) return [entry.title for entry in entries if entry.title] def titles_for_genre(self, genre: str) -> List[str]: """Optional: Liefert Titel für ein Genre.""" if not self._requests_available: return [] genre = (genre or "").strip() if not genre: return [] if genre == self.POPULAR_GENRE_LABEL: return self.popular_series() catalog = self._ensure_catalog() entries = catalog.get(genre, []) for entry in entries: self._remember_series_result(entry.title, entry.url, entry.description) return [entry.title for entry in entries if entry.title] @staticmethod def _title_group_key(title: str) -> str: raw = (title or "").strip() if not raw: return "#" for char in raw: if char.isdigit(): return "0-9" if char.isalpha(): normalized = char.casefold() if normalized == "ä": normalized = "a" elif normalized == "ö": normalized = "o" elif normalized == "ü": normalized = "u" elif normalized == "ß": normalized = "s" return normalized.upper() return "#" @classmethod def _group_matches(cls, group_code: str, title: str) -> bool: key = cls._title_group_key(title) if group_code == "0-9": return key == "0-9" if key == "0-9" or key == "#": return False if group_code == "A-E": return "A" <= key <= "E" if group_code == "F-J": return "F" <= key <= "J" if group_code == "K-O": return "K" <= key <= "O" if group_code == "P-T": return "P" <= key <= "T" if group_code == "U-Z": return "U" <= key <= "Z" return False def _ensure_genre_group_cache(self, genre: str) -> Dict[str, List[str]]: cached = self._genre_group_cache.get(genre) if cached is not None: return cached titles = self.titles_for_genre(genre) grouped: Dict[str, List[str]] = {} for title in titles: for code in ("A-E", "F-J", "K-O", "P-T", "U-Z", "0-9"): if self._group_matches(code, title): grouped.setdefault(code, []).append(title) break for code in grouped: grouped[code].sort(key=str.casefold) self._genre_group_cache[genre] = grouped return grouped @staticmethod def _genre_slug(genre: str) -> str: value = (genre or "").strip().casefold() value = value.replace("&", " und ") value = unicodedata.normalize("NFKD", value) value = "".join(ch for ch in value if not unicodedata.combining(ch)) value = re.sub(r"[^a-z0-9]+", "-", value).strip("-") return value def _fetch_genre_page_titles(self, genre: str, page: int) -> Tuple[List[str], int]: slug = self._genre_slug(genre) if not slug: return [], 1 cache_key = (slug, page) cached = self._genre_page_titles_cache.get(cache_key) cached_pages = self._genre_page_count_cache.get(slug) if cached is not None and cached_pages is not None: return list(cached), int(cached_pages) url = f"{_get_base_url()}/genre/{slug}" if page > 1: url = f"{url}?page={int(page)}" soup = _get_soup_simple(url) titles: List[str] = [] seen: set[str] = set() for anchor in soup.select("a.show-card[href]"): href = (anchor.get("href") or "").strip() series_url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/") if "/serie/" not in series_url: continue img = anchor.select_one("img[alt]") title = ((img.get("alt") if img else "") or "").strip() if not title: continue key = title.casefold() if key in seen: continue seen.add(key) self._remember_series_result(title, series_url) titles.append(title) max_page = 1 for anchor in soup.select("a[href*='?page=']"): href = (anchor.get("href") or "").strip() match = re.search(r"[?&]page=(\d+)", href) if not match: continue try: max_page = max(max_page, int(match.group(1))) except Exception: continue self._genre_page_titles_cache[cache_key] = list(titles) self._genre_page_count_cache[slug] = max_page return list(titles), max_page def titles_for_genre_group_page(self, genre: str, group_code: str, page: int = 1, page_size: int = 10) -> List[str]: genre = (genre or "").strip() group_code = (group_code or "").strip() page = max(1, int(page or 1)) page_size = max(1, int(page_size or 10)) needed = page * page_size + 1 matched: List[str] = [] try: _, max_pages = self._fetch_genre_page_titles(genre, 1) for page_index in range(1, max_pages + 1): page_titles, _ = self._fetch_genre_page_titles(genre, page_index) for title in page_titles: if self._group_matches(group_code, title): matched.append(title) if len(matched) >= needed: break start = (page - 1) * page_size end = start + page_size return list(matched[start:end]) except Exception: grouped = self._ensure_genre_group_cache(genre) titles = grouped.get(group_code, []) start = (page - 1) * page_size end = start + page_size return list(titles[start:end]) def genre_group_has_more(self, genre: str, group_code: str, page: int = 1, page_size: int = 10) -> bool: genre = (genre or "").strip() group_code = (group_code or "").strip() page = max(1, int(page or 1)) page_size = max(1, int(page_size or 10)) needed = page * page_size + 1 count = 0 try: _, max_pages = self._fetch_genre_page_titles(genre, 1) for page_index in range(1, max_pages + 1): page_titles, _ = self._fetch_genre_page_titles(genre, page_index) for title in page_titles: if self._group_matches(group_code, title): count += 1 if count >= needed: return True return False except Exception: grouped = self._ensure_genre_group_cache(genre) titles = grouped.get(group_code, []) return len(titles) > (page * page_size) def _ensure_popular(self) -> List[SeriesResult]: """Laedt und cached die Liste der beliebten Serien aus `/beliebte-serien`.""" if self._popular_cache is not None: return list(self._popular_cache) soup = _get_soup_simple(_popular_series_url()) results: List[SeriesResult] = [] seen: set[str] = set() # Neues Layout (Stand: 2026-01): Abschnitt "Meistgesehen" hat Karten mit # `a.show-card` und Titel im `img alt=...`. anchors = None for section in soup.select("div.mb-5"): h2 = section.select_one("h2") label = (h2.get_text(" ", strip=True) if h2 else "").casefold() if "meistgesehen" in label: anchors = section.select("a.show-card[href]") break if anchors is None: anchors = soup.select("a.show-card[href]") for anchor in anchors: href = (anchor.get("href") or "").strip() if not href or "/serie/" not in href: continue img = anchor.select_one("img[alt]") title = ((img.get("alt") if img else "") or "").strip() if not title or title in seen: continue url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/") url = re.sub(r"/staffel-\d+(?:/.*)?$", "", url).rstrip("/") if not url: continue _log_parsed_url(url) seen.add(title) results.append(SeriesResult(title=title, description="", url=url)) self._popular_cache = list(results) return list(results) @staticmethod def _season_label(number: int) -> str: return f"Staffel {number}" @staticmethod def _episode_label(info: EpisodeInfo) -> str: suffix_parts: List[str] = [] if info.original_title: suffix_parts.append(info.original_title) # Staffel nicht im Episoden-Label anzeigen (wird im UI bereits gesetzt). suffix = f" ({' | '.join(suffix_parts)})" if suffix_parts else "" return f"Episode {info.number}: {info.title}{suffix}" @staticmethod def _parse_season_number(label: str) -> Optional[int]: digits = "".join(ch for ch in label if ch.isdigit()) if not digits: return None return int(digits) def _clear_episode_cache_for_title(self, title: str) -> None: keys_to_remove = [key for key in self._episode_label_cache if key[0] == title] for key in keys_to_remove: self._episode_label_cache.pop(key, None) keys_to_remove = [key for key in self._hoster_cache if key[0] == title] for key in keys_to_remove: self._hoster_cache.pop(key, None) def _cache_episode_labels(self, title: str, season_label: str, season_info: SeasonInfo) -> None: cache_key = (title, season_label) self._episode_label_cache[cache_key] = { self._episode_label(info): info for info in season_info.episodes } def _ensure_season_links(self, title: str) -> List[SeasonInfo]: cached = self._season_links_cache.get(title) if cached is not None: return list(cached) series = self._series_results.get(title) if not series: cached_url = self._title_url_cache.get(title.casefold().strip(), "") if cached_url: series = SeriesResult(title=title, description="", url=cached_url) self._series_results[title] = series if not series: catalog = self._ensure_catalog() lookup_key = title.casefold().strip() for entries in catalog.values(): for entry in entries: if entry.title.casefold().strip() == lookup_key: series = entry self._remember_series_result(entry.title, entry.url, entry.description) break if series: break if not series: return [] session_links = self._load_session_season_links(series.url) if session_links: self._season_links_cache[title] = list(session_links) return list(session_links) try: seasons = scrape_series_detail(series.url, load_episodes=False) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc self._season_links_cache[title] = list(seasons) self._save_session_season_links(series.url, seasons) return list(seasons) def remember_series_url(self, title: str, series_url: str) -> None: title = (title or "").strip() series_url = (series_url or "").strip() if not title or not series_url: return self._remember_series_result(title, series_url) def series_url_for_title(self, title: str) -> str: title = (title or "").strip() if not title: return "" direct = self._series_results.get(title) if direct and direct.url: return direct.url cached_url = self._title_url_cache.get(title.casefold().strip(), "") if cached_url: return cached_url lookup_key = title.casefold().strip() for entry in self._series_results.values(): if entry.title.casefold().strip() == lookup_key and entry.url: return entry.url return "" def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]: seasons = self._season_cache.get(title) or [] for season in seasons: if season.number == season_number and season.episodes: return season links = self._ensure_season_links(title) target = next((season for season in links if season.number == season_number), None) if not target: return None cached_episodes = self._load_session_season_episodes(target.url) if cached_episodes: season_info = SeasonInfo(number=target.number, url=target.url, episodes=list(cached_episodes)) updated = [season for season in seasons if season.number != season_number] updated.append(season_info) updated.sort(key=lambda item: item.number) self._season_cache[title] = updated return season_info try: season_soup = _get_soup(target.url, session=get_requests_session("serienstream", headers=HEADERS)) season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup)) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Serienstream-Episoden konnten nicht geladen werden: {exc}") from exc updated = [season for season in seasons if season.number != season_number] updated.append(season_info) updated.sort(key=lambda item: item.number) self._season_cache[title] = updated self._save_session_season_episodes(target.url, season_info.episodes) return season_info def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]: cache_key = (title, season_label) cached = self._episode_label_cache.get(cache_key) if cached: return cached.get(episode_label) number = self._parse_season_number(season_label) if number is None: return None season_info = self._ensure_season_episodes(title, number) if season_info: self._cache_episode_labels(title, season_label, season_info) return self._episode_label_cache.get(cache_key, {}).get(episode_label) return None async def search_titles(self, query: str) -> List[str]: query = query.strip() if not query: self._series_results.clear() self._season_cache.clear() self._season_links_cache.clear() self._episode_label_cache.clear() self._catalog_cache = None return [] if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 nicht suchen.") try: # Nutzt den Katalog (/serien), der jetzt nach Genres gruppiert ist. # Alternativ gäbe es ein Ajax-Endpoint, aber der ist nicht immer zuverlässig erreichbar. results = search_series(query) except Exception as exc: # pragma: no cover - defensive logging self._series_results.clear() self._season_cache.clear() self._episode_label_cache.clear() self._catalog_cache = None raise RuntimeError(f"Serienstream-Suche fehlgeschlagen: {exc}") from exc self._series_results = {} for result in results: self._remember_series_result(result.title, result.url, result.description) self._season_cache.clear() self._season_links_cache.clear() self._episode_label_cache.clear() return [result.title for result in results] def _ensure_seasons(self, title: str) -> List[SeasonInfo]: if title in self._season_cache: seasons = self._season_cache[title] # Auch bei Cache-Treffern die URLs loggen, damit nachvollziehbar bleibt, # welche Seiten für Staffel-/Episodenlisten relevant sind. if _get_setting_bool(GLOBAL_SETTING_LOG_URLS, default=False): series = self._series_results.get(title) if series and series.url: _log_url(series.url, kind="CACHE") for season in seasons: if season.url: _log_url(season.url, kind="CACHE") return seasons series = self._series_results.get(title) if not series: # Kodi startet das Plugin pro Navigation neu -> Such-Cache im RAM geht verloren. # Daher den Titel erneut im Katalog auflösen, um die Serien-URL zu bekommen. catalog = self._ensure_catalog() lookup_key = title.casefold().strip() for entries in catalog.values(): for entry in entries: if entry.title.casefold().strip() == lookup_key: series = entry self._remember_series_result(entry.title, entry.url, entry.description) break if series: break if not series: return [] seasons = self._ensure_season_links(title) self._clear_episode_cache_for_title(title) self._season_cache[title] = list(seasons) return list(seasons) def seasons_for(self, title: str) -> List[str]: seasons = self._ensure_seasons(title) return [self._season_label(season.number) for season in seasons] def episodes_for(self, title: str, season: str) -> List[str]: number = self._parse_season_number(season) if number is None: return [] season_info = self._ensure_season_episodes(title, number) if season_info: labels = [self._episode_label(info) for info in season_info.episodes] self._cache_episode_labels(title, season, season_info) return labels return [] def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.") episode_info = self._lookup_episode(title, season, episode) if not episode_info: return None try: link = fetch_episode_stream_link( episode_info.url, preferred_hosters=self._preferred_hosters, ) if link: _log_url(link, kind="FOUND") return link except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.") cache_key = (title, season, episode) cached = self._hoster_cache.get(cache_key) if cached is not None: return list(cached) episode_info = self._lookup_episode(title, season, episode) if not episode_info: return [] try: names = fetch_episode_hoster_names(episode_info.url) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc self._hoster_cache[cache_key] = list(names) return list(names) def latest_episodes(self, page: int = 1) -> List[LatestEpisode]: """Liefert die neuesten Episoden aus `/neue-episoden`.""" if not self._requests_available: return [] try: page = int(page or 1) except Exception: page = 1 page = max(1, page) cached = self._latest_cache.get(page) if cached is not None: return list(cached) url = _latest_episodes_url() if page > 1: url = f"{url}?page={page}" soup = _get_soup_simple(url) episodes = _extract_latest_episodes(soup) self._latest_cache[page] = list(episodes) return list(episodes) def available_hosters_for_url(self, episode_url: str) -> List[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.") normalized = _absolute_url(episode_url) cached = self._latest_hoster_cache.get(normalized) if cached is not None: return list(cached) try: names = fetch_episode_hoster_names(normalized) except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc self._latest_hoster_cache[normalized] = list(names) return list(names) def stream_link_for_url(self, episode_url: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.") normalized = _absolute_url(episode_url) try: link = fetch_episode_stream_link( normalized, preferred_hosters=self._preferred_hosters, ) if link: _log_url(link, kind="FOUND") return link except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc def resolve_stream_link(self, link: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links aufloesen.") try: resolved = resolve_redirect(link) if not resolved: return None try: from resolveurl_backend import resolve as resolve_with_resolveurl except Exception: resolve_with_resolveurl = None if callable(resolve_with_resolveurl): resolved_by_resolveurl = resolve_with_resolveurl(resolved) if resolved_by_resolveurl: _log_url("ResolveURL", kind="HOSTER_RESOLVER") _log_url(resolved_by_resolveurl, kind="MEDIA") return resolved_by_resolveurl _log_url(resolved, kind="FINAL") return resolved except Exception as exc: # pragma: no cover - defensive logging raise RuntimeError(f"Stream-Link konnte nicht verfolgt werden: {exc}") from exc def set_preferred_hosters(self, hosters: List[str]) -> None: normalized = [hoster.strip().lower() for hoster in hosters if hoster.strip()] if normalized: self._preferred_hosters = normalized def reset_preferred_hosters(self) -> None: self._preferred_hosters = list(self._default_preferred_hosters) # Alias für die automatische Plugin-Erkennung. Plugin = SerienstreamPlugin