Files
ViewIT/addon/plugins/serienstream_plugin.py

1947 lines
74 KiB
Python

"""Serienstream (s.to) Integration als Downloader-Plugin.
Hinweise:
- Diese Integration nutzt optional `requests` + `beautifulsoup4` (bs4).
- In Kodi koennen zusaetzliche Debug-Funktionen ueber Addon-Settings aktiviert werden
(URL-Logging, HTML-Dumps, Benachrichtigungen).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from html import unescape
import json
import hashlib
import os
import re
import time
import unicodedata
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
from urllib.parse import quote
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
try: # pragma: no cover - optional Kodi helpers
import xbmcaddon # type: ignore[import-not-found]
import xbmcvfs # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow running outside Kodi
xbmcaddon = None
xbmcvfs = None
xbmcgui = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
from regex_patterns import SEASON_EPISODE_TAG, SEASON_EPISODE_URL
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession = Any
BeautifulSoupT = Any
SETTING_BASE_URL = "serienstream_base_url"
DEFAULT_BASE_URL = "https://s.to"
DEFAULT_PREFERRED_HOSTERS = ["voe"]
DEFAULT_TIMEOUT = 20
SEARCH_TIMEOUT = 8
ADDON_ID = "plugin.video.viewit"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_serienstream"
SETTING_DUMP_HTML = "dump_html_serienstream"
SETTING_SHOW_URL_INFO = "show_url_info_serienstream"
SETTING_LOG_ERRORS = "log_errors_serienstream"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
SESSION_CACHE_TTL_SECONDS = 300
SESSION_CACHE_PREFIX = "viewit.serienstream"
SESSION_CACHE_MAX_TITLE_URLS = 800
CATALOG_SEARCH_TTL_SECONDS = 600
CATALOG_SEARCH_CACHE_KEY = "catalog_index"
_CATALOG_INDEX_MEMORY: tuple[float, List["SeriesResult"]] = (0.0, [])
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
def _emit_progress(callback: ProgressCallback, message: str, percent: Optional[int] = None) -> None:
if not callable(callback):
return
try:
callback(str(message or ""), None if percent is None else int(percent))
except Exception:
return
@dataclass
class SeriesResult:
title: str
description: str
url: str
@dataclass
class EpisodeInfo:
number: int
title: str
original_title: str
url: str
season_label: str = ""
languages: List[str] = field(default_factory=list)
hosters: List[str] = field(default_factory=list)
@dataclass
class LatestEpisode:
series_title: str
season: int
episode: int
url: str
airdate: str
@dataclass
class SeasonInfo:
number: int
url: str
episodes: List[EpisodeInfo]
def _extract_series_metadata(soup: BeautifulSoupT) -> Tuple[Dict[str, str], Dict[str, str]]:
info: Dict[str, str] = {}
art: Dict[str, str] = {}
if not soup:
return info, art
title_tag = soup.select_one("h1")
title = (title_tag.get_text(" ", strip=True) if title_tag else "").strip()
if title:
info["title"] = title
description = ""
desc_tag = soup.select_one(".series-description .description-text")
if desc_tag:
description = (desc_tag.get_text(" ", strip=True) or "").strip()
if not description:
meta_desc = soup.select_one("meta[property='og:description'], meta[name='description']")
if meta_desc:
description = (meta_desc.get("content") or "").strip()
if description:
info["plot"] = description
poster = ""
poster_tag = soup.select_one(
".show-cover-mobile img[data-src], .show-cover-mobile img[src], .col-3 img[data-src], .col-3 img[src]"
)
if poster_tag:
poster = (poster_tag.get("data-src") or poster_tag.get("src") or "").strip()
if not poster:
for candidate in soup.select("img[data-src], img[src]"):
url = (candidate.get("data-src") or candidate.get("src") or "").strip()
if "/media/images/channel/" in url:
poster = url
break
if poster:
poster = _absolute_url(poster)
art["poster"] = poster
art["thumb"] = poster
fanart = ""
fanart_tag = soup.select_one("meta[property='og:image']")
if fanart_tag:
fanart = (fanart_tag.get("content") or "").strip()
if fanart:
fanart = _absolute_url(fanart)
art["fanart"] = fanart
art["landscape"] = fanart
return info, art
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _series_base_url() -> str:
return f"{_get_base_url()}/serie/stream"
def _popular_series_url() -> str:
return f"{_get_base_url()}/beliebte-serien"
def _latest_episodes_url() -> str:
return f"{_get_base_url()}"
def _absolute_url(href: str) -> str:
return f"{_get_base_url()}{href}" if href.startswith("/") else href
def _session_window() -> Any:
if xbmcgui is None:
return None
try:
return xbmcgui.Window(10000)
except Exception:
return None
def _session_cache_key(name: str) -> str:
base_hash = hashlib.sha1(_get_base_url().encode("utf-8")).hexdigest()[:12]
return f"{SESSION_CACHE_PREFIX}.{base_hash}.{name}"
def _session_cache_get(name: str) -> Any:
window = _session_window()
if window is None:
return None
raw = ""
try:
raw = window.getProperty(_session_cache_key(name)) or ""
except Exception:
return None
if not raw:
return None
try:
payload = json.loads(raw)
except Exception:
return None
if not isinstance(payload, dict):
return None
expires_at = payload.get("expires_at")
data = payload.get("data")
try:
if float(expires_at or 0) <= time.time():
return None
except Exception:
return None
return data
def _session_cache_set(name: str, data: Any, *, ttl_seconds: int = SESSION_CACHE_TTL_SECONDS) -> None:
window = _session_window()
if window is None:
return
payload = {
"expires_at": float(time.time() + max(1, int(ttl_seconds))),
"data": data,
}
try:
raw = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
except Exception:
return
# Kodi-Properties sind kein Dauer-Storage; begrenzen, damit UI stabil bleibt.
if len(raw) > 240_000:
return
try:
window.setProperty(_session_cache_key(name), raw)
except Exception:
return
def _normalize_series_url(identifier: str) -> str:
if identifier.startswith("http://") or identifier.startswith("https://"):
return identifier.rstrip("/")
slug = identifier.strip("/")
return f"{_series_base_url()}/{slug}"
def _series_root_url(url: str) -> str:
"""Normalisiert eine Serien-URL auf die Root-URL (ohne /staffel-x oder /episode-x)."""
normalized = (url or "").strip().rstrip("/")
normalized = re.sub(r"/staffel-\d+(?:/.*)?$", "", normalized)
normalized = re.sub(r"/episode-\d+(?:/.*)?$", "", normalized)
return normalized.rstrip("/")
def _log_visit(url: str) -> None:
_log_url(url, kind="VISIT")
_notify_url(url)
if xbmcaddon is None:
print(f"Visiting: {url}")
def _normalize_text(value: str) -> str:
"""Legacy normalization (kept for backwards compatibility)."""
value = value.casefold()
value = re.sub(r"[^a-z0-9]+", "", value)
return value
def _normalize_search_text(value: str) -> str:
"""Normalisiert Text für die Suche ohne Wortgrenzen zu "verschmelzen".
Wichtig: Wir ersetzen Nicht-Alphanumerisches durch Leerzeichen, statt es zu entfernen.
Dadurch entstehen keine künstlichen Treffer über Wortgrenzen hinweg (z.B. "an" + "na" -> "anna").
"""
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _is_episode_tba(title: str, original_title: str) -> bool:
combined = f"{title} {original_title}".casefold()
markers = ("tba", "demnächst", "demnaechst", "coming soon", "to be announced")
return any(marker in combined for marker in markers)
def _row_is_upcoming(row: BeautifulSoupT) -> bool:
classes = row.get("class") or []
if isinstance(classes, str):
classes = classes.split()
if "upcoming" in classes:
return True
badge = row.select_one(".badge-upcoming")
if badge and (badge.get_text(" ", strip=True) or "").strip():
return True
watch_cell = row.select_one(".episode-watch-cell")
if watch_cell:
text = watch_cell.get_text(" ", strip=True).casefold()
if "tba" in text:
return True
return False
def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return get_setting_bool(ADDON_ID, setting_id, default=default)
def _notify_url(url: str) -> None:
notify_url(
ADDON_ID,
heading="Serienstream",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_url(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="serienstream_urls.log",
url=url,
kind=kind,
)
def _log_parsed_url(url: str) -> None:
_log_url(url, kind="PARSE")
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="s_to_response",
)
def _log_error(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="serienstream_errors.log",
message=message,
)
def _ensure_requests() -> None:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
def _looks_like_cloudflare_challenge(body: str) -> bool:
lower = body.lower()
markers = (
"cf-browser-verification",
"cf-challenge",
"cf_chl",
"challenge-platform",
"attention required! | cloudflare",
"just a moment...",
"cloudflare ray id",
)
return any(marker in lower for marker in markers)
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
_ensure_requests()
_log_visit(url)
sess = session or get_requests_session("serienstream", headers=HEADERS)
response = None
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
raise
try:
final_url = (response.url or url) if response is not None else url
body = (response.text or "") if response is not None else ""
if final_url != url:
_log_url(final_url, kind="REDIRECT")
_log_response_html(url, body)
if _looks_like_cloudflare_challenge(body):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return BeautifulSoup(body, "html.parser")
finally:
if response is not None:
try:
response.close()
except Exception:
pass
def _get_html_simple(url: str) -> str:
_ensure_requests()
_log_visit(url)
sess = get_requests_session("serienstream", headers=HEADERS)
response = None
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
raise
try:
final_url = (response.url or url) if response is not None else url
body = (response.text or "") if response is not None else ""
if final_url != url:
_log_url(final_url, kind="REDIRECT")
_log_response_html(url, body)
if _looks_like_cloudflare_challenge(body):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return body
finally:
if response is not None:
try:
response.close()
except Exception:
pass
def _get_soup_simple(url: str) -> BeautifulSoupT:
body = _get_html_simple(url)
return BeautifulSoup(body, "html.parser")
def _extract_genre_names_from_html(body: str) -> List[str]:
names: List[str] = []
seen: set[str] = set()
pattern = re.compile(
r"<div[^>]*class=[\"'][^\"']*background-1[^\"']*[\"'][^>]*>.*?<h3[^>]*>(.*?)</h3>",
re.IGNORECASE | re.DOTALL,
)
for match in pattern.finditer(body or ""):
text = re.sub(r"<[^>]+>", " ", match.group(1) or "")
text = unescape(re.sub(r"\s+", " ", text)).strip()
if not text:
continue
key = text.casefold()
if key in seen:
continue
seen.add(key)
names.append(text)
return names
def _strip_tags(value: str) -> str:
return re.sub(r"<[^>]+>", " ", value or "")
def _search_series_api(query: str) -> List[SeriesResult]:
query = (query or "").strip()
if not query:
return []
_ensure_requests()
sess = get_requests_session("serienstream", headers=HEADERS)
terms = [query]
if " " in query:
# Fallback: einzelne Tokens liefern in der API oft bessere Treffer.
terms.extend([token for token in query.split() if token])
seen_urls: set[str] = set()
for term in terms:
response = None
try:
response = sess.get(
f"{_get_base_url()}/api/search/suggest",
params={"term": term},
headers=HEADERS,
timeout=SEARCH_TIMEOUT,
)
response.raise_for_status()
except Exception:
continue
try:
payload = response.json()
except Exception:
continue
finally:
if response is not None:
try:
response.close()
except Exception:
pass
shows = payload.get("shows") if isinstance(payload, dict) else None
if not isinstance(shows, list):
continue
results: List[SeriesResult] = []
for item in shows:
if not isinstance(item, dict):
continue
title = (item.get("name") or "").strip()
href = (item.get("url") or "").strip()
if not title or not href:
continue
url_abs = _absolute_url(href)
if not url_abs or url_abs in seen_urls:
continue
if "/staffel-" in url_abs or "/episode-" in url_abs:
continue
seen_urls.add(url_abs)
results.append(SeriesResult(title=title, description="", url=url_abs))
if not results:
continue
filtered = [entry for entry in results if _matches_query(query, title=entry.title)]
if filtered:
return filtered
# Falls nur Token-Suche möglich war, zumindest die Ergebnisse liefern.
if term != query:
return results
return []
def _search_series_server(query: str) -> List[SeriesResult]:
if not query:
return []
api_results = _search_series_api(query)
if api_results:
return api_results
base = _get_base_url()
search_url = f"{base}/search?q={quote(query)}"
alt_url = f"{base}/suche?q={quote(query)}"
for url in (search_url, alt_url):
try:
body = _get_html_simple(url)
except Exception:
continue
if not body:
continue
soup = BeautifulSoup(body, "html.parser")
root = soup.select_one(".search-results-list")
if root is None:
continue
seen_urls: set[str] = set()
results: List[SeriesResult] = []
for card in root.select(".cover-card"):
anchor = card.select_one("a[href*='/serie/']")
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url_abs = _absolute_url(href)
if not url_abs or url_abs in seen_urls:
continue
if "/staffel-" in url_abs or "/episode-" in url_abs:
continue
title_tag = card.select_one(".show-title") or card.select_one("h3") or card.select_one("h4")
title = (title_tag.get_text(" ", strip=True) if title_tag else anchor.get_text(" ", strip=True)).strip()
if not title:
continue
seen_urls.add(url_abs)
results.append(SeriesResult(title=title, description="", url=url_abs))
if results:
return results
return []
def _extract_catalog_index_from_html(body: str, *, progress_callback: ProgressCallback = None) -> List[SeriesResult]:
items: List[SeriesResult] = []
if not body:
return items
seen_urls: set[str] = set()
item_re = re.compile(
r"<li[^>]*class=[\"'][^\"']*series-item[^\"']*[\"'][^>]*>(.*?)</li>",
re.IGNORECASE | re.DOTALL,
)
anchor_re = re.compile(r"<a[^>]+href=[\"']([^\"']+)[\"'][^>]*>(.*?)</a>", re.IGNORECASE | re.DOTALL)
data_search_re = re.compile(r"data-search=[\"']([^\"']*)[\"']", re.IGNORECASE)
for idx, match in enumerate(item_re.finditer(body), start=1):
if idx == 1 or idx % 200 == 0:
_emit_progress(progress_callback, f"Katalog parsen {idx}", 62)
block = match.group(0)
inner = match.group(1) or ""
anchor_match = anchor_re.search(inner)
if not anchor_match:
continue
href = (anchor_match.group(1) or "").strip()
url = _absolute_url(href)
if not url or "/serie/" not in url or "/staffel-" in url or "/episode-" in url:
continue
if url in seen_urls:
continue
seen_urls.add(url)
title_raw = anchor_match.group(2) or ""
title = unescape(re.sub(r"\s+", " ", _strip_tags(title_raw))).strip()
if not title:
continue
search_match = data_search_re.search(block)
description = (search_match.group(1) or "").strip() if search_match else ""
items.append(SeriesResult(title=title, description=description, url=url))
return items
def _catalog_index_from_soup(soup: BeautifulSoupT) -> List[SeriesResult]:
items: List[SeriesResult] = []
if not soup:
return items
seen_urls: set[str] = set()
for item in soup.select("li.series-item"):
anchor = item.find("a", href=True)
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url = _absolute_url(href)
if not url or "/serie/" not in url or "/staffel-" in url or "/episode-" in url:
continue
if url in seen_urls:
continue
seen_urls.add(url)
title = (anchor.get_text(" ", strip=True) or "").strip()
if not title:
continue
description = (item.get("data-search") or "").strip()
items.append(SeriesResult(title=title, description=description, url=url))
return items
def _load_catalog_index_from_cache() -> Optional[List[SeriesResult]]:
global _CATALOG_INDEX_MEMORY
expires_at, cached = _CATALOG_INDEX_MEMORY
if cached and expires_at > time.time():
return list(cached)
raw = _session_cache_get(CATALOG_SEARCH_CACHE_KEY)
if not isinstance(raw, list):
return None
items: List[SeriesResult] = []
for entry in raw:
if not isinstance(entry, list) or len(entry) < 2:
continue
title = str(entry[0] or "").strip()
url = str(entry[1] or "").strip()
description = str(entry[2] or "") if len(entry) > 2 else ""
if title and url:
items.append(SeriesResult(title=title, description=description, url=url))
if items:
_CATALOG_INDEX_MEMORY = (time.time() + CATALOG_SEARCH_TTL_SECONDS, list(items))
return items or None
def _store_catalog_index_in_cache(items: List[SeriesResult]) -> None:
global _CATALOG_INDEX_MEMORY
if not items:
return
_CATALOG_INDEX_MEMORY = (time.time() + CATALOG_SEARCH_TTL_SECONDS, list(items))
payload: List[List[str]] = []
for entry in items:
if not entry.title or not entry.url:
continue
payload.append([entry.title, entry.url, entry.description])
_session_cache_set(CATALOG_SEARCH_CACHE_KEY, payload, ttl_seconds=CATALOG_SEARCH_TTL_SECONDS)
def search_series(query: str, *, progress_callback: ProgressCallback = None) -> List[SeriesResult]:
"""Sucht Serien im (/serien)-Katalog nach Titel. Nutzt Cache + Ein-Pass-Filter."""
_ensure_requests()
if not _normalize_search_text(query):
return []
_emit_progress(progress_callback, "Server-Suche", 15)
server_results = _search_series_server(query)
if server_results:
_emit_progress(progress_callback, f"Server-Treffer: {len(server_results)}", 35)
return [entry for entry in server_results if entry.title and _matches_query(query, title=entry.title)]
_emit_progress(progress_callback, "Pruefe Such-Cache", 42)
cached = _load_catalog_index_from_cache()
if cached is not None:
_emit_progress(progress_callback, f"Cache-Treffer: {len(cached)}", 52)
return [entry for entry in cached if entry.title and _matches_query(query, title=entry.title)]
_emit_progress(progress_callback, "Lade Katalogseite", 58)
catalog_url = f"{_get_base_url()}/serien?by=genre"
body = _get_html_simple(catalog_url)
items = _extract_catalog_index_from_html(body, progress_callback=progress_callback)
if not items:
_emit_progress(progress_callback, "Fallback-Parser", 70)
soup = BeautifulSoup(body, "html.parser")
items = _catalog_index_from_soup(soup)
if items:
_store_catalog_index_in_cache(items)
_emit_progress(progress_callback, f"Filtere Treffer ({len(items)})", 85)
return [entry for entry in items if entry.title and _matches_query(query, title=entry.title)]
def parse_series_catalog(soup: BeautifulSoupT) -> Dict[str, List[SeriesResult]]:
"""Parst die Serien-Übersicht (/serien) und liefert Genre -> Serienliste."""
catalog: Dict[str, List[SeriesResult]] = {}
# Neues Layout (Stand: 2026-01): Gruppen-Header + Liste.
# - Header: `div.background-1 ...` mit `h3`
# - Einträge: `ul.series-list` -> `li.series-item[data-search]` -> `a[href]`
for header in soup.select("div.background-1 h3"):
group = (header.get_text(strip=True) or "").strip()
if not group:
continue
list_node = header.parent.find_next_sibling("ul", class_="series-list")
if not list_node:
continue
series: List[SeriesResult] = []
for item in list_node.select("li.series-item"):
anchor = item.find("a", href=True)
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url = _absolute_url(href)
if url:
_log_parsed_url(url)
if ("/serie/" not in url) or "/staffel-" in url or "/episode-" in url:
continue
title = (anchor.get_text(" ", strip=True) or "").strip()
description = (item.get("data-search") or "").strip()
if title:
series.append(SeriesResult(title=title, description=description, url=url))
if series:
catalog[group] = series
return catalog
def _extract_season_links(soup: BeautifulSoupT) -> List[Tuple[int, str]]:
season_links: List[Tuple[int, str]] = []
seen_numbers: set[int] = set()
anchors = soup.select("ul.nav.list-items-nav a[data-season-pill][href]")
for anchor in anchors:
href = anchor.get("href") or ""
if "/episode-" in href:
continue
data_number = (anchor.get("data-season-pill") or "").strip()
match = re.search(r"/staffel-(\d+)", href)
if match:
number = int(match.group(1))
elif data_number.isdigit():
number = int(data_number)
else:
label = anchor.get_text(strip=True)
if not label.isdigit():
continue
number = int(label)
if number in seen_numbers:
continue
seen_numbers.add(number)
season_url = _absolute_url(href)
if season_url:
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
return season_links
def _extract_number_of_seasons(soup: BeautifulSoupT) -> Optional[int]:
tag = soup.select_one('meta[itemprop="numberOfSeasons"]')
if not tag:
return None
content = (tag.get("content") or "").strip()
if not content.isdigit():
return None
count = int(content)
return count if count > 0 else None
def _extract_canonical_url(soup: BeautifulSoupT, fallback: str) -> str:
canonical = soup.select_one('link[rel="canonical"][href]')
href = (canonical.get("href") if canonical else "") or ""
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href.rstrip("/")
return fallback.rstrip("/")
def _extract_episodes(soup: BeautifulSoupT) -> List[EpisodeInfo]:
episodes: List[EpisodeInfo] = []
season_label = ""
season_header = soup.select_one("section.episode-section h2") or soup.select_one("h2.h3")
if season_header:
season_label = (season_header.get_text(" ", strip=True) or "").strip()
language_map = {
"german": "DE",
"english": "EN",
"japanese": "JP",
"turkish": "TR",
"spanish": "ES",
"italian": "IT",
"french": "FR",
"korean": "KO",
"russian": "RU",
"polish": "PL",
"portuguese": "PT",
"chinese": "ZH",
"arabic": "AR",
"thai": "TH",
}
# Neues Layout (Stand: 2026-01): Episoden-Tabelle mit Zeilen und onclick-URL.
rows = soup.select("table.episode-table tbody tr.episode-row")
for index, row in enumerate(rows):
if _row_is_upcoming(row):
continue
onclick = (row.get("onclick") or "").strip()
url = ""
if onclick:
match = re.search(r"location=['\\\"]([^'\\\"]+)['\\\"]", onclick)
if match:
url = _absolute_url(match.group(1))
if not url:
anchor = row.find("a", href=True)
url = _absolute_url(anchor.get("href")) if anchor else ""
if url:
_log_parsed_url(url)
number_tag = row.select_one(".episode-number-cell")
number_text = (number_tag.get_text(strip=True) if number_tag else "").strip()
match = re.search(r"/episode-(\d+)", url) if url else None
if match:
number = int(match.group(1))
else:
digits = "".join(ch for ch in number_text if ch.isdigit())
number = int(digits) if digits else index + 1
title_tag = row.select_one(".episode-title-ger")
original_tag = row.select_one(".episode-title-eng")
title = (title_tag.get_text(strip=True) if title_tag else "").strip()
original_title = (original_tag.get_text(strip=True) if original_tag else "").strip()
if not title:
title = f"Episode {number}"
if _is_episode_tba(title, original_title):
continue
hosters: List[str] = []
for img in row.select(".episode-watch-cell img"):
label = (img.get("alt") or img.get("title") or "").strip()
if label and label not in hosters:
hosters.append(label)
languages: List[str] = []
for flag in row.select(".episode-language-cell .watch-language"):
classes = flag.get("class") or []
if isinstance(classes, str):
classes = classes.split()
for cls in classes:
if cls.startswith("svg-flag-"):
key = cls.replace("svg-flag-", "").strip()
if not key:
continue
value = language_map.get(key, key.upper())
if value and value not in languages:
languages.append(value)
episodes.append(
EpisodeInfo(
number=number,
title=title,
original_title=original_title,
url=url,
season_label=season_label,
languages=languages,
hosters=hosters,
)
)
if episodes:
return episodes
return episodes
def fetch_episode_stream_link(
episode_url: str,
*,
preferred_hosters: Optional[List[str]] = None,
) -> Optional[str]:
_ensure_requests()
normalized_url = _absolute_url(episode_url)
preferred = [hoster.lower() for hoster in (preferred_hosters or DEFAULT_PREFERRED_HOSTERS)]
session = get_requests_session("serienstream", headers=HEADERS)
# Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren.
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
soup = _get_soup(normalized_url, session=session)
candidates: List[Tuple[str, str]] = []
for button in soup.select("button.link-box[data-play-url]"):
play_url = (button.get("data-play-url") or "").strip()
provider = (button.get("data-provider-name") or "").strip()
url = _absolute_url(play_url)
if url:
_log_parsed_url(url)
if provider and url:
candidates.append((provider, url))
if not candidates:
return None
for preferred_name in preferred:
for name, url in candidates:
if name.lower() == preferred_name:
return url
return candidates[0][1]
def fetch_episode_hoster_names(episode_url: str) -> List[str]:
"""Liest die verfügbaren Hoster-Namen für eine Episode aus."""
_ensure_requests()
normalized_url = _absolute_url(episode_url)
session = get_requests_session("serienstream", headers=HEADERS)
# Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren.
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
soup = _get_soup(normalized_url, session=session)
names: List[str] = []
seen: set[str] = set()
for button in soup.select("button.link-box[data-provider-name]"):
name = (button.get("data-provider-name") or "").strip()
play_url = (button.get("data-play-url") or "").strip()
url = _absolute_url(play_url)
if url:
_log_parsed_url(url)
key = name.casefold().strip()
if not key or key in seen:
continue
seen.add(key)
names.append(name)
_log_url(name, kind="HOSTER")
if names:
_log_url(f"{normalized_url}#hosters={','.join(names)}", kind="HOSTERS")
return names
_LATEST_EPISODE_TAG_RE = re.compile(SEASON_EPISODE_TAG, re.IGNORECASE)
_LATEST_EPISODE_URL_RE = re.compile(SEASON_EPISODE_URL, re.IGNORECASE)
def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]:
"""Parst die neuesten Episoden von der Startseite."""
episodes: List[LatestEpisode] = []
seen: set[str] = set()
for anchor in soup.select("a.latest-episode-row[href]"):
href = (anchor.get("href") or "").strip()
if not href or "/serie/" not in href:
continue
url = _absolute_url(href)
if not url:
continue
title_node = anchor.select_one(".ep-title")
series_title = (title_node.get("title") if title_node else "") or ""
series_title = series_title.strip() or (title_node.get_text(strip=True) if title_node else "").strip()
if not series_title:
continue
season_text = (anchor.select_one(".ep-season").get_text(strip=True) if anchor.select_one(".ep-season") else "").strip()
episode_text = (anchor.select_one(".ep-episode").get_text(strip=True) if anchor.select_one(".ep-episode") else "").strip()
season_number: Optional[int] = None
episode_number: Optional[int] = None
match = re.search(r"S\s*(\d+)", season_text, re.IGNORECASE)
if match:
season_number = int(match.group(1))
match = re.search(r"E\s*(\d+)", episode_text, re.IGNORECASE)
if match:
episode_number = int(match.group(1))
if season_number is None or episode_number is None:
match = _LATEST_EPISODE_URL_RE.search(href)
if match:
season_number = int(match.group(1))
episode_number = int(match.group(2))
if season_number is None or episode_number is None:
continue
airdate_node = anchor.select_one(".ep-time")
airdate = (airdate_node.get_text(" ", strip=True) if airdate_node else "").strip()
key = f"{url}\\t{season_number}\\t{episode_number}"
if key in seen:
continue
seen.add(key)
_log_parsed_url(url)
episodes.append(
LatestEpisode(
series_title=series_title,
season=int(season_number),
episode=int(episode_number),
url=url,
airdate=airdate,
)
)
return episodes
def resolve_redirect(target_url: str) -> Optional[str]:
_ensure_requests()
normalized_url = _absolute_url(target_url)
_log_visit(normalized_url)
session = get_requests_session("serienstream", headers=HEADERS)
# Preflight optional: Startseite kann 5xx liefern, Zielseite aber funktionieren.
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
response = None
try:
response = session.get(
normalized_url,
headers=HEADERS,
timeout=DEFAULT_TIMEOUT,
allow_redirects=True,
)
if response.url:
_log_url(response.url, kind="RESOLVED")
return response.url if response.url else None
finally:
if response is not None:
try:
response.close()
except Exception:
pass
def scrape_series_detail(
series_identifier: str,
max_seasons: Optional[int] = None,
*,
load_episodes: bool = True,
) -> List[SeasonInfo]:
_ensure_requests()
series_url = _series_root_url(_normalize_series_url(series_identifier))
_log_url(series_url, kind="SERIES")
_notify_url(series_url)
session = get_requests_session("serienstream", headers=HEADERS)
soup = _get_soup(series_url, session=session)
base_series_url = _series_root_url(_extract_canonical_url(soup, series_url))
season_links = _extract_season_links(soup)
season_count = _extract_number_of_seasons(soup)
if season_count and (not season_links or len(season_links) < season_count):
existing = {number for number, _ in season_links}
for number in range(1, season_count + 1):
if number in existing:
continue
season_url = f"{base_series_url}/staffel-{number}"
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
if max_seasons is not None:
season_links = season_links[:max_seasons]
seasons: List[SeasonInfo] = []
for number, url in season_links:
episodes: List[EpisodeInfo] = []
if load_episodes:
season_soup = _get_soup(url, session=session)
episodes = _extract_episodes(season_soup)
seasons.append(SeasonInfo(number=number, url=url, episodes=episodes))
seasons.sort(key=lambda s: s.number)
return seasons
class SerienstreamPlugin(BasisPlugin):
"""Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt."""
name = "Serienstream"
version = "1.0.0"
POPULAR_GENRE_LABEL = "⭐ Beliebte Serien"
def __init__(self) -> None:
self._series_results: Dict[str, SeriesResult] = {}
self._title_url_cache: Dict[str, str] = self._load_title_url_cache()
self._genre_names_cache: Optional[List[str]] = None
self._season_cache: Dict[str, List[SeasonInfo]] = {}
self._season_links_cache: Dict[str, List[SeasonInfo]] = {}
self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {}
self._catalog_cache: Optional[Dict[str, List[SeriesResult]]] = None
self._genre_group_cache: Dict[str, Dict[str, List[str]]] = {}
self._genre_page_titles_cache: Dict[Tuple[str, int], List[str]] = {}
self._genre_page_count_cache: Dict[str, int] = {}
self._popular_cache: Optional[List[SeriesResult]] = None
self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {}
self._latest_cache: Dict[int, List[LatestEpisode]] = {}
self._latest_hoster_cache: Dict[str, List[str]] = {}
self._series_metadata_cache: Dict[str, Tuple[Dict[str, str], Dict[str, str]]] = {}
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
print(
"SerienstreamPlugin deaktiviert: requests/bs4 fehlen. "
"Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"Importfehler: {REQUESTS_IMPORT_ERROR}")
return
def _load_title_url_cache(self) -> Dict[str, str]:
raw = _session_cache_get("title_urls")
if not isinstance(raw, dict):
return {}
result: Dict[str, str] = {}
for key, value in raw.items():
key_text = str(key or "").strip().casefold()
url_text = str(value or "").strip()
if not key_text or not url_text:
continue
result[key_text] = url_text
return result
def _save_title_url_cache(self) -> None:
if not self._title_url_cache:
return
# Begrenzt die Session-Daten auf die jüngsten Einträge.
while len(self._title_url_cache) > SESSION_CACHE_MAX_TITLE_URLS:
self._title_url_cache.pop(next(iter(self._title_url_cache)))
_session_cache_set("title_urls", self._title_url_cache)
def _remember_series_result(self, title: str, url: str, description: str = "") -> None:
title = (title or "").strip()
url = (url or "").strip()
if not title:
return
if url:
self._series_results[title] = SeriesResult(title=title, description=description, url=url)
cache_key = title.casefold()
if self._title_url_cache.get(cache_key) != url:
self._title_url_cache[cache_key] = url
self._save_title_url_cache()
if url:
return
current = self._series_results.get(title)
if current is None:
self._series_results[title] = SeriesResult(title=title, description=description, url="")
@staticmethod
def _metadata_cache_key(title: str) -> str:
return (title or "").strip().casefold()
def _series_for_title(self, title: str) -> Optional[SeriesResult]:
direct = self._series_results.get(title)
if direct and direct.url:
return direct
lookup_key = (title or "").strip().casefold()
for item in self._series_results.values():
if item.title.casefold().strip() == lookup_key and item.url:
return item
cached_url = self._title_url_cache.get(lookup_key, "")
if cached_url:
return SeriesResult(title=title, description="", url=cached_url)
return None
@staticmethod
def _season_links_cache_name(series_url: str) -> str:
digest = hashlib.sha1((series_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_links.{digest}"
@staticmethod
def _season_episodes_cache_name(season_url: str) -> str:
digest = hashlib.sha1((season_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_episodes.{digest}"
def _load_session_season_links(self, series_url: str) -> Optional[List[SeasonInfo]]:
raw = _session_cache_get(self._season_links_cache_name(series_url))
if not isinstance(raw, list):
return None
seasons: List[SeasonInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
url = str(item.get("url") or "").strip()
if number <= 0 or not url:
continue
seasons.append(SeasonInfo(number=number, url=url, episodes=[]))
if not seasons:
return None
seasons.sort(key=lambda s: s.number)
return seasons
def _save_session_season_links(self, series_url: str, seasons: List[SeasonInfo]) -> None:
payload = [{"number": int(season.number), "url": season.url} for season in seasons if season.url]
if payload:
_session_cache_set(self._season_links_cache_name(series_url), payload)
def _load_session_season_episodes(self, season_url: str) -> Optional[List[EpisodeInfo]]:
raw = _session_cache_get(self._season_episodes_cache_name(season_url))
if not isinstance(raw, list):
return None
episodes: List[EpisodeInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
title = str(item.get("title") or "").strip()
original_title = str(item.get("original_title") or "").strip()
url = str(item.get("url") or "").strip()
season_label = str(item.get("season_label") or "").strip()
languages = [str(lang).strip() for lang in list(item.get("languages") or []) if str(lang).strip()]
hosters = [str(host).strip() for host in list(item.get("hosters") or []) if str(host).strip()]
if number <= 0:
continue
episodes.append(
EpisodeInfo(
number=number,
title=title or f"Episode {number}",
original_title=original_title,
url=url,
season_label=season_label,
languages=languages,
hosters=hosters,
)
)
if not episodes:
return None
episodes.sort(key=lambda item: item.number)
return episodes
def _save_session_season_episodes(self, season_url: str, episodes: List[EpisodeInfo]) -> None:
payload = []
for item in episodes:
payload.append(
{
"number": int(item.number),
"title": item.title,
"original_title": item.original_title,
"url": item.url,
"season_label": item.season_label,
"languages": list(item.languages or []),
"hosters": list(item.hosters or []),
}
)
if payload:
_session_cache_set(self._season_episodes_cache_name(season_url), payload)
def _ensure_catalog(self) -> Dict[str, List[SeriesResult]]:
if self._catalog_cache is not None:
return self._catalog_cache
# Stand: 2026-01 liefert `?by=genre` konsistente Gruppen für `genres()`.
catalog_url = f"{_get_base_url()}/serien?by=genre"
soup = _get_soup_simple(catalog_url)
self._catalog_cache = parse_series_catalog(soup)
_session_cache_set("genres", sorted(self._catalog_cache.keys(), key=str.casefold))
return self._catalog_cache
def _ensure_genre_names(self) -> List[str]:
if self._genre_names_cache is not None:
return list(self._genre_names_cache)
cached = _session_cache_get("genres")
if isinstance(cached, list):
genres = [str(value).strip() for value in cached if str(value).strip()]
if genres:
self._genre_names_cache = sorted(set(genres), key=str.casefold)
return list(self._genre_names_cache)
catalog_url = f"{_get_base_url()}/serien?by=genre"
try:
body = _get_html_simple(catalog_url)
genres = _extract_genre_names_from_html(body)
except Exception:
genres = []
if not genres:
catalog = self._ensure_catalog()
genres = sorted(catalog.keys(), key=str.casefold)
else:
genres = sorted(set(genres), key=str.casefold)
self._genre_names_cache = list(genres)
_session_cache_set("genres", self._genre_names_cache)
return list(self._genre_names_cache)
def genres(self) -> List[str]:
"""Optional: Liefert alle Genres aus dem Serien-Katalog."""
if not self._requests_available:
return []
return self._ensure_genre_names()
def capabilities(self) -> set[str]:
"""Meldet unterstützte Features für Router-Menüs."""
return {"popular_series", "genres", "latest_episodes"}
def popular_series(self) -> List[str]:
"""Liefert die Titel der beliebten Serien (Quelle: `/beliebte-serien`)."""
if not self._requests_available:
return []
entries = self._ensure_popular()
for entry in entries:
self._remember_series_result(entry.title, entry.url, entry.description)
return [entry.title for entry in entries if entry.title]
def titles_for_genre(self, genre: str) -> List[str]:
"""Optional: Liefert Titel für ein Genre."""
if not self._requests_available:
return []
genre = (genre or "").strip()
if not genre:
return []
if genre == self.POPULAR_GENRE_LABEL:
return self.popular_series()
catalog = self._ensure_catalog()
entries = catalog.get(genre, [])
for entry in entries:
self._remember_series_result(entry.title, entry.url, entry.description)
return [entry.title for entry in entries if entry.title]
@staticmethod
def _title_group_key(title: str) -> str:
raw = (title or "").strip()
if not raw:
return "#"
for char in raw:
if char.isdigit():
return "0-9"
if char.isalpha():
normalized = char.casefold()
if normalized == "ä":
normalized = "a"
elif normalized == "ö":
normalized = "o"
elif normalized == "ü":
normalized = "u"
elif normalized == "ß":
normalized = "s"
return normalized.upper()
return "#"
@classmethod
def _group_matches(cls, group_code: str, title: str) -> bool:
key = cls._title_group_key(title)
if group_code == "0-9":
return key == "0-9"
if key == "0-9" or key == "#":
return False
if group_code == "A-E":
return "A" <= key <= "E"
if group_code == "F-J":
return "F" <= key <= "J"
if group_code == "K-O":
return "K" <= key <= "O"
if group_code == "P-T":
return "P" <= key <= "T"
if group_code == "U-Z":
return "U" <= key <= "Z"
return False
def _ensure_genre_group_cache(self, genre: str) -> Dict[str, List[str]]:
cached = self._genre_group_cache.get(genre)
if cached is not None:
return cached
titles = self.titles_for_genre(genre)
grouped: Dict[str, List[str]] = {}
for title in titles:
for code in ("A-E", "F-J", "K-O", "P-T", "U-Z", "0-9"):
if self._group_matches(code, title):
grouped.setdefault(code, []).append(title)
break
for code in grouped:
grouped[code].sort(key=str.casefold)
self._genre_group_cache[genre] = grouped
return grouped
@staticmethod
def _genre_slug(genre: str) -> str:
value = (genre or "").strip().casefold()
value = value.replace("&", " und ")
value = unicodedata.normalize("NFKD", value)
value = "".join(ch for ch in value if not unicodedata.combining(ch))
value = re.sub(r"[^a-z0-9]+", "-", value).strip("-")
return value
def _fetch_genre_page_titles(self, genre: str, page: int) -> Tuple[List[str], int]:
slug = self._genre_slug(genre)
if not slug:
return [], 1
cache_key = (slug, page)
cached = self._genre_page_titles_cache.get(cache_key)
cached_pages = self._genre_page_count_cache.get(slug)
if cached is not None and cached_pages is not None:
return list(cached), int(cached_pages)
url = f"{_get_base_url()}/genre/{slug}"
if page > 1:
url = f"{url}?page={int(page)}"
soup = _get_soup_simple(url)
titles: List[str] = []
seen: set[str] = set()
for anchor in soup.select("a.show-card[href]"):
href = (anchor.get("href") or "").strip()
series_url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if "/serie/" not in series_url:
continue
img = anchor.select_one("img[alt]")
title = ((img.get("alt") if img else "") or "").strip()
if not title:
continue
key = title.casefold()
if key in seen:
continue
seen.add(key)
self._remember_series_result(title, series_url)
titles.append(title)
max_page = 1
for anchor in soup.select("a[href*='?page=']"):
href = (anchor.get("href") or "").strip()
match = re.search(r"[?&]page=(\d+)", href)
if not match:
continue
try:
max_page = max(max_page, int(match.group(1)))
except Exception:
continue
self._genre_page_titles_cache[cache_key] = list(titles)
self._genre_page_count_cache[slug] = max_page
return list(titles), max_page
def titles_for_genre_group_page(self, genre: str, group_code: str, page: int = 1, page_size: int = 10) -> List[str]:
genre = (genre or "").strip()
group_code = (group_code or "").strip()
page = max(1, int(page or 1))
page_size = max(1, int(page_size or 10))
needed = page * page_size + 1
matched: List[str] = []
try:
_, max_pages = self._fetch_genre_page_titles(genre, 1)
for page_index in range(1, max_pages + 1):
page_titles, _ = self._fetch_genre_page_titles(genre, page_index)
for title in page_titles:
if self._group_matches(group_code, title):
matched.append(title)
if len(matched) >= needed:
break
start = (page - 1) * page_size
end = start + page_size
return list(matched[start:end])
except Exception:
grouped = self._ensure_genre_group_cache(genre)
titles = grouped.get(group_code, [])
start = (page - 1) * page_size
end = start + page_size
return list(titles[start:end])
def genre_group_has_more(self, genre: str, group_code: str, page: int = 1, page_size: int = 10) -> bool:
genre = (genre or "").strip()
group_code = (group_code or "").strip()
page = max(1, int(page or 1))
page_size = max(1, int(page_size or 10))
needed = page * page_size + 1
count = 0
try:
_, max_pages = self._fetch_genre_page_titles(genre, 1)
for page_index in range(1, max_pages + 1):
page_titles, _ = self._fetch_genre_page_titles(genre, page_index)
for title in page_titles:
if self._group_matches(group_code, title):
count += 1
if count >= needed:
return True
return False
except Exception:
grouped = self._ensure_genre_group_cache(genre)
titles = grouped.get(group_code, [])
return len(titles) > (page * page_size)
def _ensure_popular(self) -> List[SeriesResult]:
"""Laedt und cached die Liste der beliebten Serien aus `/beliebte-serien`."""
if self._popular_cache is not None:
return list(self._popular_cache)
soup = _get_soup_simple(_popular_series_url())
results: List[SeriesResult] = []
seen: set[str] = set()
# Neues Layout (Stand: 2026-01): Abschnitt "Meistgesehen" hat Karten mit
# `a.show-card` und Titel im `img alt=...`.
anchors = None
for section in soup.select("div.mb-5"):
h2 = section.select_one("h2")
label = (h2.get_text(" ", strip=True) if h2 else "").casefold()
if "meistgesehen" in label:
anchors = section.select("a.show-card[href]")
break
if anchors is None:
anchors = soup.select("a.show-card[href]")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href or "/serie/" not in href:
continue
img = anchor.select_one("img[alt]")
title = ((img.get("alt") if img else "") or "").strip()
if not title or title in seen:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
url = re.sub(r"/staffel-\d+(?:/.*)?$", "", url).rstrip("/")
if not url:
continue
_log_parsed_url(url)
seen.add(title)
results.append(SeriesResult(title=title, description="", url=url))
self._popular_cache = list(results)
return list(results)
@staticmethod
def _season_label(number: int) -> str:
return f"Staffel {number}"
@staticmethod
def _episode_label(info: EpisodeInfo) -> str:
suffix_parts: List[str] = []
if info.original_title:
suffix_parts.append(info.original_title)
# Staffel nicht im Episoden-Label anzeigen (wird im UI bereits gesetzt).
suffix = f" ({' | '.join(suffix_parts)})" if suffix_parts else ""
return f"Episode {info.number}: {info.title}{suffix}"
@staticmethod
def _parse_season_number(label: str) -> Optional[int]:
digits = "".join(ch for ch in label if ch.isdigit())
if not digits:
return None
return int(digits)
def _clear_episode_cache_for_title(self, title: str) -> None:
keys_to_remove = [key for key in self._episode_label_cache if key[0] == title]
for key in keys_to_remove:
self._episode_label_cache.pop(key, None)
keys_to_remove = [key for key in self._hoster_cache if key[0] == title]
for key in keys_to_remove:
self._hoster_cache.pop(key, None)
def _cache_episode_labels(self, title: str, season_label: str, season_info: SeasonInfo) -> None:
cache_key = (title, season_label)
self._episode_label_cache[cache_key] = {
self._episode_label(info): info for info in season_info.episodes
}
def _ensure_season_links(self, title: str) -> List[SeasonInfo]:
cached = self._season_links_cache.get(title)
if cached is not None:
return list(cached)
series = self._series_results.get(title)
if not series:
cached_url = self._title_url_cache.get(title.casefold().strip(), "")
if cached_url:
series = SeriesResult(title=title, description="", url=cached_url)
self._series_results[title] = series
if not series:
catalog = self._ensure_catalog()
lookup_key = title.casefold().strip()
for entries in catalog.values():
for entry in entries:
if entry.title.casefold().strip() == lookup_key:
series = entry
self._remember_series_result(entry.title, entry.url, entry.description)
break
if series:
break
if not series:
return []
session_links = self._load_session_season_links(series.url)
if session_links:
self._season_links_cache[title] = list(session_links)
return list(session_links)
try:
series_soup = _get_soup(series.url, session=get_requests_session("serienstream", headers=HEADERS))
info_labels, art = _extract_series_metadata(series_soup)
if series.description and "plot" not in info_labels:
info_labels["plot"] = series.description
cache_key = self._metadata_cache_key(title)
if info_labels or art:
self._series_metadata_cache[cache_key] = (info_labels, art)
base_series_url = _series_root_url(_extract_canonical_url(series_soup, series.url))
season_links = _extract_season_links(series_soup)
season_count = _extract_number_of_seasons(series_soup)
if season_count and (not season_links or len(season_links) < season_count):
existing = {number for number, _ in season_links}
for number in range(1, season_count + 1):
if number in existing:
continue
season_url = f"{base_series_url}/staffel-{number}"
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
seasons = [SeasonInfo(number=number, url=url, episodes=[]) for number, url in season_links]
seasons.sort(key=lambda s: s.number)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc
self._season_links_cache[title] = list(seasons)
self._save_session_season_links(series.url, seasons)
return list(seasons)
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._remember_series_result(title, series_url)
def metadata_for(self, title: str) -> Tuple[Dict[str, str], Dict[str, str], Optional[List[Any]]]:
title = (title or "").strip()
if not title or not self._requests_available:
return {}, {}, None
cache_key = self._metadata_cache_key(title)
cached = self._series_metadata_cache.get(cache_key)
if cached is not None:
info, art = cached
return dict(info), dict(art), None
series = self._series_for_title(title)
if series is None or not series.url:
info = {"title": title}
self._series_metadata_cache[cache_key] = (dict(info), {})
return info, {}, None
info: Dict[str, str] = {"title": title}
art: Dict[str, str] = {}
if series.description:
info["plot"] = series.description
try:
soup = _get_soup(series.url, session=get_requests_session("serienstream", headers=HEADERS))
parsed_info, parsed_art = _extract_series_metadata(soup)
if parsed_info:
info.update(parsed_info)
if parsed_art:
art.update(parsed_art)
except Exception:
pass
self._series_metadata_cache[cache_key] = (dict(info), dict(art))
return info, art, None
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._series_results.get(title)
if direct and direct.url:
return direct.url
cached_url = self._title_url_cache.get(title.casefold().strip(), "")
if cached_url:
return cached_url
lookup_key = title.casefold().strip()
for entry in self._series_results.values():
if entry.title.casefold().strip() == lookup_key and entry.url:
return entry.url
return ""
def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]:
seasons = self._season_cache.get(title) or []
for season in seasons:
if season.number == season_number and season.episodes:
return season
links = self._ensure_season_links(title)
target = next((season for season in links if season.number == season_number), None)
if not target:
return None
cached_episodes = self._load_session_season_episodes(target.url)
if cached_episodes:
season_info = SeasonInfo(number=target.number, url=target.url, episodes=list(cached_episodes))
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
return season_info
try:
season_soup = _get_soup(target.url, session=get_requests_session("serienstream", headers=HEADERS))
season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup))
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Episoden konnten nicht geladen werden: {exc}") from exc
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
self._save_session_season_episodes(target.url, season_info.episodes)
return season_info
def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]:
cache_key = (title, season_label)
cached = self._episode_label_cache.get(cache_key)
if cached:
return cached.get(episode_label)
number = self._parse_season_number(season_label)
if number is None:
return None
season_info = self._ensure_season_episodes(title, number)
if season_info:
self._cache_episode_labels(title, season_label, season_info)
return self._episode_label_cache.get(cache_key, {}).get(episode_label)
return None
async def search_titles(self, query: str, progress_callback: ProgressCallback = None) -> List[str]:
query = query.strip()
if not query:
self._series_results.clear()
self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear()
self._catalog_cache = None
return []
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 nicht suchen.")
try:
# Nutzt den Katalog (/serien), der jetzt nach Genres gruppiert ist.
# Alternativ gäbe es ein Ajax-Endpoint, aber der ist nicht immer zuverlässig erreichbar.
_emit_progress(progress_callback, "Serienstream Suche startet", 10)
results = search_series(query, progress_callback=progress_callback)
except Exception as exc: # pragma: no cover - defensive logging
self._series_results.clear()
self._season_cache.clear()
self._episode_label_cache.clear()
self._catalog_cache = None
raise RuntimeError(f"Serienstream-Suche fehlgeschlagen: {exc}") from exc
self._series_results = {}
for result in results:
self._remember_series_result(result.title, result.url, result.description)
self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear()
_emit_progress(progress_callback, f"Treffer aufbereitet: {len(results)}", 95)
return [result.title for result in results]
def _ensure_seasons(self, title: str) -> List[SeasonInfo]:
if title in self._season_cache:
seasons = self._season_cache[title]
# Auch bei Cache-Treffern die URLs loggen, damit nachvollziehbar bleibt,
# welche Seiten für Staffel-/Episodenlisten relevant sind.
if _get_setting_bool(GLOBAL_SETTING_LOG_URLS, default=False):
series = self._series_results.get(title)
if series and series.url:
_log_url(series.url, kind="CACHE")
for season in seasons:
if season.url:
_log_url(season.url, kind="CACHE")
return seasons
series = self._series_results.get(title)
if not series:
# Kodi startet das Plugin pro Navigation neu -> Such-Cache im RAM geht verloren.
# Daher den Titel erneut im Katalog auflösen, um die Serien-URL zu bekommen.
catalog = self._ensure_catalog()
lookup_key = title.casefold().strip()
for entries in catalog.values():
for entry in entries:
if entry.title.casefold().strip() == lookup_key:
series = entry
self._remember_series_result(entry.title, entry.url, entry.description)
break
if series:
break
if not series:
return []
seasons = self._ensure_season_links(title)
self._clear_episode_cache_for_title(title)
self._season_cache[title] = list(seasons)
return list(seasons)
def seasons_for(self, title: str) -> List[str]:
seasons = self._ensure_seasons(title)
return [self._season_label(season.number) for season in seasons]
def episodes_for(self, title: str, season: str) -> List[str]:
number = self._parse_season_number(season)
if number is None:
return []
season_info = self._ensure_season_episodes(title, number)
if season_info:
labels = [self._episode_label(info) for info in season_info.episodes]
self._cache_episode_labels(title, season, season_info)
return labels
return []
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.")
episode_info = self._lookup_episode(title, season, episode)
if not episode_info:
return None
try:
link = fetch_episode_stream_link(
episode_info.url,
preferred_hosters=self._preferred_hosters,
)
if link:
_log_url(link, kind="FOUND")
return link
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc
def episode_url_for(self, title: str, season: str, episode: str) -> str:
cache_key = (title, season)
cached = self._episode_label_cache.get(cache_key)
if cached:
info = cached.get(episode)
if info and info.url:
return info.url
episode_info = self._lookup_episode(title, season, episode)
if episode_info and episode_info.url:
return episode_info.url
return ""
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.")
cache_key = (title, season, episode)
cached = self._hoster_cache.get(cache_key)
if cached is not None:
return list(cached)
episode_info = self._lookup_episode(title, season, episode)
if not episode_info:
return []
try:
names = fetch_episode_hoster_names(episode_info.url)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc
self._hoster_cache[cache_key] = list(names)
return list(names)
def latest_episodes(self, page: int = 1) -> List[LatestEpisode]:
"""Liefert die neuesten Episoden aus `/neue-episoden`."""
if not self._requests_available:
return []
try:
page = int(page or 1)
except Exception:
page = 1
page = max(1, page)
cached = self._latest_cache.get(page)
if cached is not None:
return list(cached)
url = _latest_episodes_url()
if page > 1:
url = f"{url}?page={page}"
soup = _get_soup_simple(url)
episodes = _extract_latest_episodes(soup)
self._latest_cache[page] = list(episodes)
return list(episodes)
def available_hosters_for_url(self, episode_url: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.")
normalized = _absolute_url(episode_url)
cached = self._latest_hoster_cache.get(normalized)
if cached is not None:
return list(cached)
try:
names = fetch_episode_hoster_names(normalized)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Hoster konnten nicht geladen werden: {exc}") from exc
self._latest_hoster_cache[normalized] = list(names)
return list(names)
def stream_link_for_url(self, episode_url: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links liefern.")
normalized = _absolute_url(episode_url)
try:
link = fetch_episode_stream_link(
normalized,
preferred_hosters=self._preferred_hosters,
)
if link:
_log_url(link, kind="FOUND")
return link
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc
def resolve_stream_link(self, link: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Stream-Links aufloesen.")
try:
resolved = resolve_redirect(link)
if not resolved:
return None
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(resolved)
if resolved_by_resolveurl:
_log_url("ResolveURL", kind="HOSTER_RESOLVER")
_log_url(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
_log_url(resolved, kind="FINAL")
return resolved
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht verfolgt werden: {exc}") from exc
def set_preferred_hosters(self, hosters: List[str]) -> None:
normalized = [hoster.strip().lower() for hoster in hosters if hoster.strip()]
if normalized:
self._preferred_hosters = normalized
def reset_preferred_hosters(self) -> None:
self._preferred_hosters = list(self._default_preferred_hosters)
# Alias für die automatische Plugin-Erkennung.
Plugin = SerienstreamPlugin