Files
ViewIT/addon/plugins/serienstream_plugin.py

1107 lines
41 KiB
Python

"""Serienstream (s.to) Integration als Downloader-Plugin.
Hinweise:
- Diese Integration nutzt optional `requests` + `beautifulsoup4` (bs4).
- In Kodi koennen zusaetzliche Debug-Funktionen ueber Addon-Settings aktiviert werden
(URL-Logging, HTML-Dumps, Benachrichtigungen).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
import os
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
try: # pragma: no cover - optional Kodi helpers
import xbmcaddon # type: ignore[import-not-found]
import xbmcvfs # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow running outside Kodi
xbmcaddon = None
xbmcvfs = None
xbmcgui = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
from regex_patterns import SEASON_EPISODE_TAG, SEASON_EPISODE_URL
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
SETTING_BASE_URL = "serienstream_base_url"
DEFAULT_BASE_URL = "https://s.to"
DEFAULT_PREFERRED_HOSTERS = ["voe"]
DEFAULT_TIMEOUT = 20
ADDON_ID = "plugin.video.viewit"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_serienstream"
SETTING_DUMP_HTML = "dump_html_serienstream"
SETTING_SHOW_URL_INFO = "show_url_info_serienstream"
SETTING_LOG_ERRORS = "log_errors_serienstream"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass
class SeriesResult:
title: str
description: str
url: str
@dataclass
class EpisodeInfo:
number: int
title: str
original_title: str
url: str
season_label: str = ""
languages: List[str] = field(default_factory=list)
hosters: List[str] = field(default_factory=list)
@dataclass
class LatestEpisode:
series_title: str
season: int
episode: int
url: str
airdate: str
@dataclass
class SeasonInfo:
number: int
url: str
episodes: List[EpisodeInfo]
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _series_base_url() -> str:
return f"{_get_base_url()}/serie/stream"
def _popular_series_url() -> str:
return f"{_get_base_url()}/beliebte-serien"
def _latest_episodes_url() -> str:
return f"{_get_base_url()}"
def _absolute_url(href: str) -> str:
return f"{_get_base_url()}{href}" if href.startswith("/") else href
def _normalize_series_url(identifier: str) -> str:
if identifier.startswith("http://") or identifier.startswith("https://"):
return identifier.rstrip("/")
slug = identifier.strip("/")
return f"{_series_base_url()}/{slug}"
def _series_root_url(url: str) -> str:
"""Normalisiert eine Serien-URL auf die Root-URL (ohne /staffel-x oder /episode-x)."""
normalized = (url or "").strip().rstrip("/")
normalized = re.sub(r"/staffel-\d+(?:/.*)?$", "", normalized)
normalized = re.sub(r"/episode-\d+(?:/.*)?$", "", normalized)
return normalized.rstrip("/")
def _log_visit(url: str) -> None:
_log_url(url, kind="VISIT")
_notify_url(url)
if xbmcaddon is None:
print(f"Visiting: {url}")
def _normalize_text(value: str) -> str:
"""Legacy normalization (kept for backwards compatibility)."""
value = value.casefold()
value = re.sub(r"[^a-z0-9]+", "", value)
return value
def _normalize_search_text(value: str) -> str:
"""Normalisiert Text für die Suche ohne Wortgrenzen zu "verschmelzen".
Wichtig: Wir ersetzen Nicht-Alphanumerisches durch Leerzeichen, statt es zu entfernen.
Dadurch entstehen keine künstlichen Treffer über Wortgrenzen hinweg (z.B. "an" + "na" -> "anna").
"""
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _is_episode_tba(title: str, original_title: str) -> bool:
combined = f"{title} {original_title}".casefold()
markers = ("tba", "demnächst", "demnaechst", "coming soon", "to be announced")
return any(marker in combined for marker in markers)
def _row_is_upcoming(row: BeautifulSoupT) -> bool:
classes = row.get("class") or []
if isinstance(classes, str):
classes = classes.split()
if "upcoming" in classes:
return True
badge = row.select_one(".badge-upcoming")
if badge and (badge.get_text(" ", strip=True) or "").strip():
return True
watch_cell = row.select_one(".episode-watch-cell")
if watch_cell:
text = watch_cell.get_text(" ", strip=True).casefold()
if "tba" in text:
return True
return False
def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return get_setting_bool(ADDON_ID, setting_id, default=default)
def _notify_url(url: str) -> None:
notify_url(
ADDON_ID,
heading="Serienstream",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_url(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="serienstream_urls.log",
url=url,
kind=kind,
)
def _log_parsed_url(url: str) -> None:
_log_url(url, kind="PARSE")
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="s_to_response",
)
def _log_error(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="serienstream_errors.log",
message=message,
)
def _ensure_requests() -> None:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
def _looks_like_cloudflare_challenge(body: str) -> bool:
lower = body.lower()
markers = (
"cf-browser-verification",
"cf-challenge",
"cf_chl",
"challenge-platform",
"attention required! | cloudflare",
"just a moment...",
"cloudflare ray id",
)
return any(marker in lower for marker in markers)
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
_ensure_requests()
_log_visit(url)
sess = session or get_requests_session("serienstream", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
if _looks_like_cloudflare_challenge(response.text):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return BeautifulSoup(response.text, "html.parser")
def _get_soup_simple(url: str) -> BeautifulSoupT:
_ensure_requests()
_log_visit(url)
sess = get_requests_session("serienstream", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
if _looks_like_cloudflare_challenge(response.text):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return BeautifulSoup(response.text, "html.parser")
def search_series(query: str) -> List[SeriesResult]:
"""Sucht Serien im (/serien)-Katalog (Genre-liste) nach Titel/Alt-Titel."""
_ensure_requests()
normalized_query = _normalize_search_text(query)
if not normalized_query:
return []
# Direkter Abruf wie in fetch_serien.py.
catalog_url = f"{_get_base_url()}/serien?by=genre"
soup = _get_soup_simple(catalog_url)
results: List[SeriesResult] = []
for series in parse_series_catalog(soup).values():
for entry in series:
haystack = _normalize_search_text(entry.title)
if entry.title and normalized_query in haystack:
results.append(entry)
return results
def parse_series_catalog(soup: BeautifulSoupT) -> Dict[str, List[SeriesResult]]:
"""Parst die Serien-Übersicht (/serien) und liefert Genre -> Serienliste."""
catalog: Dict[str, List[SeriesResult]] = {}
# Neues Layout (Stand: 2026-01): Gruppen-Header + Liste.
# - Header: `div.background-1 ...` mit `h3`
# - Einträge: `ul.series-list` -> `li.series-item[data-search]` -> `a[href]`
for header in soup.select("div.background-1 h3"):
group = (header.get_text(strip=True) or "").strip()
if not group:
continue
list_node = header.parent.find_next_sibling("ul", class_="series-list")
if not list_node:
continue
series: List[SeriesResult] = []
for item in list_node.select("li.series-item"):
anchor = item.find("a", href=True)
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url = _absolute_url(href)
if url:
_log_parsed_url(url)
if ("/serie/" not in url) or "/staffel-" in url or "/episode-" in url:
continue
title = (anchor.get_text(" ", strip=True) or "").strip()
description = (item.get("data-search") or "").strip()
if title:
series.append(SeriesResult(title=title, description=description, url=url))
if series:
catalog[group] = series
return catalog
def _extract_season_links(soup: BeautifulSoupT) -> List[Tuple[int, str]]:
season_links: List[Tuple[int, str]] = []
seen_numbers: set[int] = set()
anchors = soup.select("ul.nav.list-items-nav a[data-season-pill][href]")
for anchor in anchors:
href = anchor.get("href") or ""
if "/episode-" in href:
continue
data_number = (anchor.get("data-season-pill") or "").strip()
match = re.search(r"/staffel-(\d+)", href)
if match:
number = int(match.group(1))
elif data_number.isdigit():
number = int(data_number)
else:
label = anchor.get_text(strip=True)
if not label.isdigit():
continue
number = int(label)
if number in seen_numbers:
continue
seen_numbers.add(number)
season_url = _absolute_url(href)
if season_url:
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
return season_links
def _extract_number_of_seasons(soup: BeautifulSoupT) -> Optional[int]:
tag = soup.select_one('meta[itemprop="numberOfSeasons"]')
if not tag:
return None
content = (tag.get("content") or "").strip()
if not content.isdigit():
return None
count = int(content)
return count if count > 0 else None
def _extract_canonical_url(soup: BeautifulSoupT, fallback: str) -> str:
canonical = soup.select_one('link[rel="canonical"][href]')
href = (canonical.get("href") if canonical else "") or ""
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href.rstrip("/")
return fallback.rstrip("/")
def _extract_episodes(soup: BeautifulSoupT) -> List[EpisodeInfo]:
episodes: List[EpisodeInfo] = []
season_label = ""
season_header = soup.select_one("section.episode-section h2") or soup.select_one("h2.h3")
if season_header:
season_label = (season_header.get_text(" ", strip=True) or "").strip()
language_map = {
"german": "DE",
"english": "EN",
"japanese": "JP",
"turkish": "TR",
"spanish": "ES",
"italian": "IT",
"french": "FR",
"korean": "KO",
"russian": "RU",
"polish": "PL",
"portuguese": "PT",
"chinese": "ZH",
"arabic": "AR",
"thai": "TH",
}
# Neues Layout (Stand: 2026-01): Episoden-Tabelle mit Zeilen und onclick-URL.
rows = soup.select("table.episode-table tbody tr.episode-row")
for index, row in enumerate(rows):
if _row_is_upcoming(row):
continue
onclick = (row.get("onclick") or "").strip()
url = ""
if onclick:
match = re.search(r"location=['\\\"]([^'\\\"]+)['\\\"]", onclick)
if match:
url = _absolute_url(match.group(1))
if not url:
anchor = row.find("a", href=True)
url = _absolute_url(anchor.get("href")) if anchor else ""
if url:
_log_parsed_url(url)
number_tag = row.select_one(".episode-number-cell")
number_text = (number_tag.get_text(strip=True) if number_tag else "").strip()
match = re.search(r"/episode-(\d+)", url) if url else None
if match:
number = int(match.group(1))
else:
digits = "".join(ch for ch in number_text if ch.isdigit())
number = int(digits) if digits else index + 1
title_tag = row.select_one(".episode-title-ger")
original_tag = row.select_one(".episode-title-eng")
title = (title_tag.get_text(strip=True) if title_tag else "").strip()
original_title = (original_tag.get_text(strip=True) if original_tag else "").strip()
if not title:
title = f"Episode {number}"
if _is_episode_tba(title, original_title):
continue
hosters: List[str] = []
for img in row.select(".episode-watch-cell img"):
label = (img.get("alt") or img.get("title") or "").strip()
if label and label not in hosters:
hosters.append(label)
languages: List[str] = []
for flag in row.select(".episode-language-cell .watch-language"):
classes = flag.get("class") or []
if isinstance(classes, str):
classes = classes.split()
for cls in classes:
if cls.startswith("svg-flag-"):
key = cls.replace("svg-flag-", "").strip()
if not key:
continue
value = language_map.get(key, key.upper())
if value and value not in languages:
languages.append(value)
episodes.append(
EpisodeInfo(
number=number,
title=title,
original_title=original_title,
url=url,
season_label=season_label,
languages=languages,
hosters=hosters,
)
)
if episodes:
return episodes
return episodes
def fetch_episode_stream_link(
episode_url: str,
*,
preferred_hosters: Optional[List[str]] = None,
) -> Optional[str]:
_ensure_requests()
normalized_url = _absolute_url(episode_url)
preferred = [hoster.lower() for hoster in (preferred_hosters or DEFAULT_PREFERRED_HOSTERS)]
session = get_requests_session("serienstream", headers=HEADERS)
# Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren.
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
soup = _get_soup(normalized_url, session=session)
candidates: List[Tuple[str, str]] = []
for button in soup.select("button.link-box[data-play-url]"):
play_url = (button.get("data-play-url") or "").strip()
provider = (button.get("data-provider-name") or "").strip()
url = _absolute_url(play_url)
if url:
_log_parsed_url(url)
if provider and url:
candidates.append((provider, url))
if not candidates:
return None
for preferred_name in preferred:
for name, url in candidates:
if name.lower() == preferred_name:
return url
return candidates[0][1]
def fetch_episode_hoster_names(episode_url: str) -> List[str]:
"""Liest die verfügbaren Hoster-Namen für eine Episode aus."""
_ensure_requests()
normalized_url = _absolute_url(episode_url)
session = get_requests_session("serienstream", headers=HEADERS)
# Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren.
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
soup = _get_soup(normalized_url, session=session)
names: List[str] = []
seen: set[str] = set()
for button in soup.select("button.link-box[data-provider-name]"):
name = (button.get("data-provider-name") or "").strip()
play_url = (button.get("data-play-url") or "").strip()
url = _absolute_url(play_url)
if url:
_log_parsed_url(url)
key = name.casefold().strip()
if not key or key in seen:
continue
seen.add(key)
names.append(name)
_log_url(name, kind="HOSTER")
if names:
_log_url(f"{normalized_url}#hosters={','.join(names)}", kind="HOSTERS")
return names
_LATEST_EPISODE_TAG_RE = re.compile(SEASON_EPISODE_TAG, re.IGNORECASE)
_LATEST_EPISODE_URL_RE = re.compile(SEASON_EPISODE_URL, re.IGNORECASE)
def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]:
"""Parst die neuesten Episoden von der Startseite."""
episodes: List[LatestEpisode] = []
seen: set[str] = set()
for anchor in soup.select("a.latest-episode-row[href]"):
href = (anchor.get("href") or "").strip()
if not href or "/serie/" not in href:
continue
url = _absolute_url(href)
if not url:
continue
title_node = anchor.select_one(".ep-title")
series_title = (title_node.get("title") if title_node else "") or ""
series_title = series_title.strip() or (title_node.get_text(strip=True) if title_node else "").strip()
if not series_title:
continue
season_text = (anchor.select_one(".ep-season").get_text(strip=True) if anchor.select_one(".ep-season") else "").strip()
episode_text = (anchor.select_one(".ep-episode").get_text(strip=True) if anchor.select_one(".ep-episode") else "").strip()
season_number: Optional[int] = None
episode_number: Optional[int] = None
match = re.search(r"S\\s*(\\d+)", season_text, re.IGNORECASE)
if match:
season_number = int(match.group(1))
match = re.search(r"E\\s*(\\d+)", episode_text, re.IGNORECASE)
if match:
episode_number = int(match.group(1))
if season_number is None or episode_number is None:
match = _LATEST_EPISODE_URL_RE.search(href)
if match:
season_number = int(match.group(1))
episode_number = int(match.group(2))
if season_number is None or episode_number is None:
continue
airdate_node = anchor.select_one(".ep-time")
airdate = (airdate_node.get_text(" ", strip=True) if airdate_node else "").strip()
key = f"{url}\\t{season_number}\\t{episode_number}"
if key in seen:
continue
seen.add(key)
_log_parsed_url(url)
episodes.append(
LatestEpisode(
series_title=series_title,
season=int(season_number),
episode=int(episode_number),
url=url,
airdate=airdate,
)
)
return episodes
def resolve_redirect(target_url: str) -> Optional[str]:
_ensure_requests()
normalized_url = _absolute_url(target_url)
_log_visit(normalized_url)
session = get_requests_session("serienstream", headers=HEADERS)
# Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren.
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
response = session.get(
normalized_url,
headers=HEADERS,
timeout=DEFAULT_TIMEOUT,
allow_redirects=True,
)
if response.url:
_log_url(response.url, kind="RESOLVED")
return response.url if response.url else None
def scrape_series_detail(
series_identifier: str,
max_seasons: Optional[int] = None,
*,
load_episodes: bool = True,
) -> List[SeasonInfo]:
_ensure_requests()
series_url = _series_root_url(_normalize_series_url(series_identifier))
_log_url(series_url, kind="SERIES")
_notify_url(series_url)
session = get_requests_session("serienstream", headers=HEADERS)
soup = _get_soup(series_url, session=session)
base_series_url = _series_root_url(_extract_canonical_url(soup, series_url))
season_links = _extract_season_links(soup)
season_count = _extract_number_of_seasons(soup)
if season_count and (not season_links or len(season_links) < season_count):
existing = {number for number, _ in season_links}
for number in range(1, season_count + 1):
if number in existing:
continue
season_url = f"{base_series_url}/staffel-{number}"
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
if max_seasons is not None:
season_links = season_links[:max_seasons]
seasons: List[SeasonInfo] = []
for number, url in season_links:
episodes: List[EpisodeInfo] = []
if load_episodes:
season_soup = _get_soup(url, session=session)
episodes = _extract_episodes(season_soup)
seasons.append(SeasonInfo(number=number, url=url, episodes=episodes))
seasons.sort(key=lambda s: s.number)
return seasons
class SerienstreamPlugin(BasisPlugin):
"""Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt."""
name = "Serienstream"
POPULAR_GENRE_LABEL = "⭐ Beliebte Serien"
def __init__(self) -> None:
self._series_results: Dict[str, SeriesResult] = {}
self._season_cache: Dict[str, List[SeasonInfo]] = {}
self._season_links_cache: Dict[str, List[SeasonInfo]] = {}
self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {}
self._catalog_cache: Optional[Dict[str, List[SeriesResult]]] = None
self._popular_cache: Optional[List[SeriesResult]] = None
self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {}
self._latest_cache: Dict[int, List[LatestEpisode]] = {}
self._latest_hoster_cache: Dict[str, List[str]] = {}
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
print(
"SerienstreamPlugin deaktiviert: requests/bs4 fehlen. "
"Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"Importfehler: {REQUESTS_IMPORT_ERROR}")
return
def _ensure_catalog(self) -> Dict[str, List[SeriesResult]]:
if self._catalog_cache is not None:
return self._catalog_cache
# Stand: 2026-01 liefert `?by=genre` konsistente Gruppen für `genres()`.
catalog_url = f"{_get_base_url()}/serien?by=genre"
soup = _get_soup_simple(catalog_url)
self._catalog_cache = parse_series_catalog(soup)
return self._catalog_cache
def genres(self) -> List[str]:
"""Optional: Liefert alle Genres aus dem Serien-Katalog."""
if not self._requests_available:
return []
catalog = self._ensure_catalog()
return sorted(catalog.keys(), key=str.casefold)
def capabilities(self) -> set[str]:
"""Meldet unterstützte Features für Router-Menüs."""
return {"popular_series", "genres", "latest_episodes"}
def popular_series(self) -> List[str]:
"""Liefert die Titel der beliebten Serien (Quelle: `/beliebte-serien`)."""
if not self._requests_available:
return []
entries = self._ensure_popular()
self._series_results.update({entry.title: entry for entry in entries if entry.title})
return [entry.title for entry in entries if entry.title]
def titles_for_genre(self, genre: str) -> List[str]:
"""Optional: Liefert Titel für ein Genre."""
if not self._requests_available:
return []
genre = (genre or "").strip()
if not genre:
return []
if genre == self.POPULAR_GENRE_LABEL:
return self.popular_series()
catalog = self._ensure_catalog()
entries = catalog.get(genre, [])
self._series_results.update({entry.title: entry for entry in entries if entry.title})
return [entry.title for entry in entries if entry.title]
def _ensure_popular(self) -> List[SeriesResult]:
"""Laedt und cached die Liste der beliebten Serien aus `/beliebte-serien`."""
if self._popular_cache is not None:
return list(self._popular_cache)
soup = _get_soup_simple(_popular_series_url())
results: List[SeriesResult] = []
seen: set[str] = set()
# Neues Layout (Stand: 2026-01): Abschnitt "Meistgesehen" hat Karten mit
# `a.show-card` und Titel im `img alt=...`.
anchors = None
for section in soup.select("div.mb-5"):
h2 = section.select_one("h2")
label = (h2.get_text(" ", strip=True) if h2 else "").casefold()
if "meistgesehen" in label:
anchors = section.select("a.show-card[href]")
break
if anchors is None:
anchors = soup.select("a.show-card[href]")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href or "/serie/" not in href:
continue
img = anchor.select_one("img[alt]")
title = ((img.get("alt") if img else "") or "").strip()
if not title or title in seen:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
url = re.sub(r"/staffel-\\d+(?:/.*)?$", "", url).rstrip("/")
if not url:
continue
_log_parsed_url(url)
seen.add(title)
results.append(SeriesResult(title=title, description="", url=url))
self._popular_cache = list(results)
return list(results)
@staticmethod
def _season_label(number: int) -> str:
return f"Staffel {number}"
@staticmethod
def _episode_label(info: EpisodeInfo) -> str:
suffix_parts: List[str] = []
if info.original_title:
suffix_parts.append(info.original_title)
# Staffel nicht im Episoden-Label anzeigen (wird im UI bereits gesetzt).
suffix = f" ({' | '.join(suffix_parts)})" if suffix_parts else ""
return f"Episode {info.number}: {info.title}{suffix}"
@staticmethod
def _parse_season_number(label: str) -> Optional[int]:
digits = "".join(ch for ch in label if ch.isdigit())
if not digits:
return None
return int(digits)
def _clear_episode_cache_for_title(self, title: str) -> None:
keys_to_remove = [key for key in self._episode_label_cache if key[0] == title]
for key in keys_to_remove:
self._episode_label_cache.pop(key, None)
keys_to_remove = [key for key in self._hoster_cache if key[0] == title]
for key in keys_to_remove:
self._hoster_cache.pop(key, None)
def _cache_episode_labels(self, title: str, season_label: str, season_info: SeasonInfo) -> None:
cache_key = (title, season_label)
self._episode_label_cache[cache_key] = {
self._episode_label(info): info for info in season_info.episodes
}
def _ensure_season_links(self, title: str) -> List[SeasonInfo]:
cached = self._season_links_cache.get(title)
if cached is not None:
return list(cached)
series = self._series_results.get(title)
if not series:
catalog = self._ensure_catalog()
lookup_key = title.casefold().strip()
for entries in catalog.values():
for entry in entries:
if entry.title.casefold().strip() == lookup_key:
series = entry
self._series_results[entry.title] = entry
break
if series:
break
if not series:
return []
try:
seasons = scrape_series_detail(series.url, load_episodes=False)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc
self._season_links_cache[title] = list(seasons)
return list(seasons)
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._series_results[title] = SeriesResult(title=title, description="", url=series_url)
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._series_results.get(title)
if direct and direct.url:
return direct.url
lookup_key = title.casefold().strip()
for entry in self._series_results.values():
if entry.title.casefold().strip() == lookup_key and entry.url:
return entry.url
return ""
def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]:
seasons = self._season_cache.get(title) or []
for season in seasons:
if season.number == season_number and season.episodes:
return season
links = self._ensure_season_links(title)
target = next((season for season in links if season.number == season_number), None)
if not target:
return None
try:
season_soup = _get_soup(target.url, session=get_requests_session("serienstream", headers=HEADERS))
season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup))
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Episoden konnten nicht geladen werden: {exc}") from exc
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
return season_info
def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]:
cache_key = (title, season_label)
cached = self._episode_label_cache.get(cache_key)
if cached:
return cached.get(episode_label)
number = self._parse_season_number(season_label)
if number is None:
return None
season_info = self._ensure_season_episodes(title, number)
if season_info:
self._cache_episode_labels(title, season_label, season_info)
return self._episode_label_cache.get(cache_key, {}).get(episode_label)
return None
async def search_titles(self, query: str) -> List[str]:
query = query.strip()
if not query:
self._series_results.clear()
self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear()
self._catalog_cache = None
return []
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 nicht suchen.")
try:
# Nutzt den Katalog (/serien), der jetzt nach Genres gruppiert ist.
# Alternativ gäbe es ein Ajax-Endpoint, aber der ist nicht immer zuverlässig erreichbar.
results = search_series(query)
except Exception as exc: # pragma: no cover - defensive logging
self._series_results.clear()
self._season_cache.clear()
self._episode_label_cache.clear()
self._catalog_cache = None
raise RuntimeError(f"Serienstream-Suche fehlgeschlagen: {exc}") from exc
self._series_results = {result.title: result for result in results}
self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear()
return [result.title for result in results]
def _ensure_seasons(self, title: str) -> List[SeasonInfo]:
if title in self._season_cache:
seasons = self._season_cache[title]
# Auch bei Cache-Treffern die URLs loggen, damit nachvollziehbar bleibt,
# welche Seiten für Staffel-/Episodenlisten relevant sind.
if _get_setting_bool(GLOBAL_SETTING_LOG_URLS, default=False):
series = self._series_results.get(title)
if series and series.url:
_log_url(series.url, kind="CACHE")
for season in seasons:
if season.url:
_log_url(season.url, kind="CACHE")
return seasons
series = self._series_results.get(title)
if not series:
# Kodi startet das Plugin pro Navigation neu -> Such-Cache im RAM geht verloren.
# Daher den Titel erneut im Katalog auflösen, um die Serien-URL zu bekommen.
catalog = self._ensure_catalog()
lookup_key = title.casefold().strip()
for entries in catalog.values():
for entry in entries:
if entry.title.casefold().strip() == lookup_key:
series = entry
self._series_results[entry.title] = entry
break
if series:
break
if not series:
return []
seasons = self._ensure_season_links(title)
self._clear_episode_cache_for_title(title)
self._season_cache[title] = list(seasons)
return list(seasons)
def seasons_for(self, title: str) -> List[str]:
seasons = self._ensure_seasons(title)
return [self._season_label(season.number) for season in seasons]
def episodes_for(self, title: str, season: str) -> List[str]:
number = self._parse_season_number(season)
if number is None:
return []
season_info = self._ensure_season_episodes(title, number)
if season_info:
labels = [self._episode_label(info) for info in season_info.episodes]
self._cache_episode_labels(title, season, season_info)
return labels
return []
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.")
episode_info = self._lookup_episode(title, season, episode)
if not episode_info:
return None
try:
link = fetch_episode_stream_link(
episode_info.url,
preferred_hosters=self._preferred_hosters,
)
if link:
_log_url(link, kind="FOUND")
return link
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.")
cache_key = (title, season, episode)
cached = self._hoster_cache.get(cache_key)
if cached is not None:
return list(cached)
episode_info = self._lookup_episode(title, season, episode)
if not episode_info:
return []
try:
names = fetch_episode_hoster_names(episode_info.url)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc
self._hoster_cache[cache_key] = list(names)
return list(names)
def latest_episodes(self, page: int = 1) -> List[LatestEpisode]:
"""Liefert die neuesten Episoden aus `/neue-episoden`."""
if not self._requests_available:
return []
try:
page = int(page or 1)
except Exception:
page = 1
page = max(1, page)
cached = self._latest_cache.get(page)
if cached is not None:
return list(cached)
url = _latest_episodes_url()
if page > 1:
url = f"{url}?page={page}"
soup = _get_soup_simple(url)
episodes = _extract_latest_episodes(soup)
self._latest_cache[page] = list(episodes)
return list(episodes)
def available_hosters_for_url(self, episode_url: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.")
normalized = _absolute_url(episode_url)
cached = self._latest_hoster_cache.get(normalized)
if cached is not None:
return list(cached)
try:
names = fetch_episode_hoster_names(normalized)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc
self._latest_hoster_cache[normalized] = list(names)
return list(names)
def stream_link_for_url(self, episode_url: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.")
normalized = _absolute_url(episode_url)
try:
link = fetch_episode_stream_link(
normalized,
preferred_hosters=self._preferred_hosters,
)
if link:
_log_url(link, kind="FOUND")
return link
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc
def resolve_stream_link(self, link: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links aufloesen.")
try:
resolved = resolve_redirect(link)
if not resolved:
return None
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(resolved)
if resolved_by_resolveurl:
_log_url("ResolveURL", kind="HOSTER_RESOLVER")
_log_url(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
_log_url(resolved, kind="FINAL")
return resolved
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht verfolgt werden: {exc}") from exc
def set_preferred_hosters(self, hosters: List[str]) -> None:
normalized = [hoster.strip().lower() for hoster in hosters if hoster.strip()]
if normalized:
self._preferred_hosters = normalized
def reset_preferred_hosters(self) -> None:
self._preferred_hosters = list(self._default_preferred_hosters)
# Alias für die automatische Plugin-Erkennung.
Plugin = SerienstreamPlugin