Compare commits

..

4 Commits

5 changed files with 734 additions and 80 deletions

5
.gitignore vendored
View File

@@ -6,3 +6,8 @@
# Build outputs # Build outputs
/dist/ /dist/
# Local tests (not committed)
/tests/
/.pytest_cache/
/pytest.ini

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.47" provider-name="ViewIt"> <addon id="plugin.video.viewit" name="ViewIt" version="0.1.48" provider-name="ViewIt">
<requires> <requires>
<import addon="xbmc.python" version="3.0.0" /> <import addon="xbmc.python" version="3.0.0" />
<import addon="script.module.requests" /> <import addon="script.module.requests" />

View File

@@ -944,11 +944,12 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
display_label = _label_with_duration(title, info_labels) display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate) display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False)) direct_play = bool(plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False))
extra_params = _series_url_params(plugin, title)
_add_directory_item( _add_directory_item(
handle, handle,
display_label, display_label,
"play_movie" if direct_play else "seasons", "play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **extra_params},
is_folder=not direct_play, is_folder=not direct_play,
info_labels=merged_info, info_labels=merged_info,
art=art, art=art,
@@ -1025,6 +1026,17 @@ def _run_async(coro):
return asyncio.run(coro) return asyncio.run(coro)
def _series_url_params(plugin: BasisPlugin, title: str) -> dict[str, str]:
getter = getattr(plugin, "series_url_for_title", None)
if not callable(getter):
return {}
try:
series_url = str(getter(title) or "").strip()
except Exception:
return {}
return {"series_url": series_url} if series_url else {}
def _show_search() -> None: def _show_search() -> None:
_log("Suche gestartet.") _log("Suche gestartet.")
dialog = xbmcgui.Dialog() dialog = xbmcgui.Dialog()
@@ -1071,11 +1083,12 @@ def _show_search_results(query: str) -> None:
direct_play = bool( direct_play = bool(
plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False) plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False)
) )
extra_params = _series_url_params(plugin, title)
_add_directory_item( _add_directory_item(
handle, handle,
label, label,
"play_movie" if direct_play else "seasons", "play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **extra_params},
is_folder=not direct_play, is_folder=not direct_play,
info_labels=merged_info, info_labels=merged_info,
art=art, art=art,
@@ -1084,7 +1097,7 @@ def _show_search_results(query: str) -> None:
xbmcplugin.endOfDirectory(handle) xbmcplugin.endOfDirectory(handle)
def _show_seasons(plugin_name: str, title: str) -> None: def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None:
handle = _get_handle() handle = _get_handle()
_log(f"Staffeln laden: {plugin_name} / {title}") _log(f"Staffeln laden: {plugin_name} / {title}")
plugin = _discover_plugins().get(plugin_name) plugin = _discover_plugins().get(plugin_name)
@@ -1092,6 +1105,13 @@ def _show_seasons(plugin_name: str, title: str) -> None:
xbmcgui.Dialog().notification("Staffeln", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcgui.Dialog().notification("Staffeln", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle) xbmcplugin.endOfDirectory(handle)
return return
if series_url:
remember_series_url = getattr(plugin, "remember_series_url", None)
if callable(remember_series_url):
try:
remember_series_url(title, series_url)
except Exception:
pass
# Einschalten liefert Filme. Für Playback soll nach dem Öffnen des Titels direkt ein # Einschalten liefert Filme. Für Playback soll nach dem Öffnen des Titels direkt ein
# einzelnes abspielbares Item angezeigt werden: <Titel> -> (<Titel> abspielbar). # einzelnes abspielbares Item angezeigt werden: <Titel> -> (<Titel> abspielbar).
@@ -1227,7 +1247,7 @@ def _show_seasons(plugin_name: str, title: str) -> None:
handle, handle,
_label_with_playstate(season, season_state), _label_with_playstate(season, season_state),
"episodes", "episodes",
{"plugin": plugin_name, "title": title, "season": season}, {"plugin": plugin_name, "title": title, "season": season, "series_url": series_url},
is_folder=True, is_folder=True,
info_labels=merged_labels or None, info_labels=merged_labels or None,
art=merged_art, art=merged_art,
@@ -1236,7 +1256,7 @@ def _show_seasons(plugin_name: str, title: str) -> None:
xbmcplugin.endOfDirectory(handle) xbmcplugin.endOfDirectory(handle)
def _show_episodes(plugin_name: str, title: str, season: str) -> None: def _show_episodes(plugin_name: str, title: str, season: str, series_url: str = "") -> None:
handle = _get_handle() handle = _get_handle()
_log(f"Episoden laden: {plugin_name} / {title} / {season}") _log(f"Episoden laden: {plugin_name} / {title} / {season}")
plugin = _discover_plugins().get(plugin_name) plugin = _discover_plugins().get(plugin_name)
@@ -1244,6 +1264,13 @@ def _show_episodes(plugin_name: str, title: str, season: str) -> None:
xbmcgui.Dialog().notification("Episoden", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000) xbmcgui.Dialog().notification("Episoden", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle) xbmcplugin.endOfDirectory(handle)
return return
if series_url:
remember_series_url = getattr(plugin, "remember_series_url", None)
if callable(remember_series_url):
try:
remember_series_url(title, series_url)
except Exception:
pass
season_number = _extract_first_int(season) season_number = _extract_first_int(season)
if season_number is not None: if season_number is not None:
xbmcplugin.setPluginCategory(handle, f"{title} - Staffel {season_number}") xbmcplugin.setPluginCategory(handle, f"{title} - Staffel {season_number}")
@@ -1439,7 +1466,7 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
handle, handle,
display_label, display_label,
"play_movie" if direct_play else "seasons", "play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play, is_folder=not direct_play,
info_labels=info_labels, info_labels=info_labels,
art=art, art=art,
@@ -1456,7 +1483,7 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
handle, handle,
_label_with_playstate(title, playstate), _label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons", "play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play, is_folder=not direct_play,
info_labels=_apply_playstate_to_info({"title": title}, playstate), info_labels=_apply_playstate_to_info({"title": title}, playstate),
) )
@@ -1672,7 +1699,7 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
handle, handle,
display_label, display_label,
"seasons", "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True, is_folder=True,
info_labels=info_labels, info_labels=info_labels,
art=art, art=art,
@@ -1685,7 +1712,7 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
handle, handle,
_label_with_playstate(title, playstate), _label_with_playstate(title, playstate),
"seasons", "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True, is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate), info_labels=_apply_playstate_to_info({"title": title}, playstate),
) )
@@ -1817,7 +1844,7 @@ def _show_new_titles(plugin_name: str, page: int = 1) -> None:
handle, handle,
display_label, display_label,
"play_movie" if direct_play else "seasons", "play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play, is_folder=not direct_play,
info_labels=info_labels, info_labels=info_labels,
art=art, art=art,
@@ -1834,7 +1861,7 @@ def _show_new_titles(plugin_name: str, page: int = 1) -> None:
handle, handle,
_label_with_playstate(title, playstate), _label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons", "play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play, is_folder=not direct_play,
info_labels=_apply_playstate_to_info({"title": title}, playstate), info_labels=_apply_playstate_to_info({"title": title}, playstate),
) )
@@ -1939,6 +1966,86 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
handle = _get_handle() handle = _get_handle()
page_size = 10 page_size = 10
page = max(1, int(page or 1)) page = max(1, int(page or 1))
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Genres", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
grouped_paging = getattr(plugin, "titles_for_genre_group_page", None)
grouped_has_more = getattr(plugin, "genre_group_has_more", None)
if callable(grouped_paging):
try:
page_items = [str(t).strip() for t in list(grouped_paging(genre, group_code, page, page_size) or []) if t and str(t).strip()]
except Exception as exc:
_log(f"Genre-Serien konnten nicht geladen werden ({plugin_name}/{genre}/{group_code} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Genres", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
xbmcplugin.setPluginCategory(handle, f"{genre} [{group_code}] ({page})")
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"genre_series_group",
{"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page - 1)},
is_folder=True,
)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
show_next = False
if callable(grouped_has_more):
try:
show_next = bool(grouped_has_more(genre, group_code, page, page_size))
except Exception:
show_next = False
elif len(page_items) >= page_size:
show_next = True
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"genre_series_group",
{"plugin": plugin_name, "genre": genre, "group": group_code, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
return
try: try:
titles = _get_genre_titles(plugin_name, genre) titles = _get_genre_titles(plugin_name, genre)
@@ -1986,7 +2093,7 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
handle, handle,
display_label, display_label,
"seasons", "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True, is_folder=True,
info_labels=info_labels, info_labels=info_labels,
art=art, art=art,
@@ -1999,7 +2106,7 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
handle, handle,
_label_with_playstate(title, playstate), _label_with_playstate(title, playstate),
"seasons", "seasons",
{"plugin": plugin_name, "title": title}, {"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True, is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate), info_labels=_apply_playstate_to_info({"title": title}, playstate),
) )
@@ -2373,12 +2480,13 @@ def run() -> None:
elif action == "settings": elif action == "settings":
_open_settings() _open_settings()
elif action == "seasons": elif action == "seasons":
_show_seasons(params.get("plugin", ""), params.get("title", "")) _show_seasons(params.get("plugin", ""), params.get("title", ""), params.get("series_url", ""))
elif action == "episodes": elif action == "episodes":
_show_episodes( _show_episodes(
params.get("plugin", ""), params.get("plugin", ""),
params.get("title", ""), params.get("title", ""),
params.get("season", ""), params.get("season", ""),
params.get("series_url", ""),
) )
elif action == "play_episode": elif action == "play_episode":
_play_episode( _play_episode(

View File

@@ -417,15 +417,16 @@ def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]:
return episodes return episodes
def scrape_anime_detail(anime_identifier: str, max_seasons: Optional[int] = None) -> List[SeasonInfo]: def scrape_anime_detail(
anime_identifier: str,
max_seasons: Optional[int] = None,
*,
load_episodes: bool = True,
) -> List[SeasonInfo]:
_ensure_requests() _ensure_requests()
anime_url = _series_root_url(_absolute_url(anime_identifier)) anime_url = _series_root_url(_absolute_url(anime_identifier))
_log_url(anime_url, kind="ANIME") _log_url(anime_url, kind="ANIME")
session = get_requests_session("aniworld", headers=HEADERS) session = get_requests_session("aniworld", headers=HEADERS)
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
soup = _get_soup(anime_url, session=session) soup = _get_soup(anime_url, session=session)
base_anime_url = _series_root_url(_extract_canonical_url(soup, anime_url)) base_anime_url = _series_root_url(_extract_canonical_url(soup, anime_url))
@@ -445,6 +446,8 @@ def scrape_anime_detail(anime_identifier: str, max_seasons: Optional[int] = None
seasons: List[SeasonInfo] = [] seasons: List[SeasonInfo] = []
for number, url in season_links: for number, url in season_links:
episodes: List[EpisodeInfo] = []
if load_episodes:
season_soup = _get_soup(url, session=session) season_soup = _get_soup(url, session=session)
episodes = _extract_episodes(season_soup) episodes = _extract_episodes(season_soup)
seasons.append(SeasonInfo(number=number, url=url, episodes=episodes)) seasons.append(SeasonInfo(number=number, url=url, episodes=episodes))
@@ -598,6 +601,7 @@ class AniworldPlugin(BasisPlugin):
def __init__(self) -> None: def __init__(self) -> None:
self._anime_results: Dict[str, SeriesResult] = {} self._anime_results: Dict[str, SeriesResult] = {}
self._season_cache: Dict[str, List[SeasonInfo]] = {} self._season_cache: Dict[str, List[SeasonInfo]] = {}
self._season_links_cache: Dict[str, List[SeasonInfo]] = {}
self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {} self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {}
self._popular_cache: Optional[List[SeriesResult]] = None self._popular_cache: Optional[List[SeriesResult]] = None
self._genre_cache: Optional[Dict[str, List[SeriesResult]]] = None self._genre_cache: Optional[Dict[str, List[SeriesResult]]] = None
@@ -801,17 +805,64 @@ class AniworldPlugin(BasisPlugin):
cache_key = (title, season_label) cache_key = (title, season_label)
self._episode_label_cache[cache_key] = {self._episode_label(info): info for info in season_info.episodes} self._episode_label_cache[cache_key] = {self._episode_label(info): info for info in season_info.episodes}
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._anime_results[title] = SeriesResult(title=title, description="", url=series_url)
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._anime_results.get(title)
if direct and direct.url:
return direct.url
wanted = title.casefold().strip()
for candidate in self._anime_results.values():
if candidate.title and candidate.title.casefold().strip() == wanted and candidate.url:
return candidate.url
return ""
def _ensure_season_links(self, title: str) -> List[SeasonInfo]:
cached = self._season_links_cache.get(title)
if cached is not None:
return list(cached)
anime = self._find_series_by_title(title)
if not anime:
return []
seasons = scrape_anime_detail(anime.url, load_episodes=False)
self._season_links_cache[title] = list(seasons)
return list(seasons)
def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]:
seasons = self._season_cache.get(title) or []
for season in seasons:
if season.number == season_number and season.episodes:
return season
links = self._ensure_season_links(title)
target = next((season for season in links if season.number == season_number), None)
if not target:
return None
season_soup = _get_soup(target.url, session=get_requests_session("aniworld", headers=HEADERS))
season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup))
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
return season_info
def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]: def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]:
cache_key = (title, season_label) cache_key = (title, season_label)
cached = self._episode_label_cache.get(cache_key) cached = self._episode_label_cache.get(cache_key)
if cached: if cached:
return cached.get(episode_label) return cached.get(episode_label)
seasons = self._ensure_seasons(title)
number = self._parse_season_number(season_label) number = self._parse_season_number(season_label)
if number is None: if number is None:
return None return None
for season_info in seasons: season_info = self._ensure_season_episodes(title, number)
if season_info.number == number: if season_info:
self._cache_episode_labels(title, season_label, season_info) self._cache_episode_labels(title, season_label, season_info)
return self._episode_label_cache.get(cache_key, {}).get(episode_label) return self._episode_label_cache.get(cache_key, {}).get(episode_label)
return None return None
@@ -821,6 +872,7 @@ class AniworldPlugin(BasisPlugin):
if not query: if not query:
self._anime_results.clear() self._anime_results.clear()
self._season_cache.clear() self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear() self._episode_label_cache.clear()
self._popular_cache = None self._popular_cache = None
return [] return []
@@ -835,30 +887,27 @@ class AniworldPlugin(BasisPlugin):
raise RuntimeError(f"AniWorld-Suche fehlgeschlagen: {exc}") from exc raise RuntimeError(f"AniWorld-Suche fehlgeschlagen: {exc}") from exc
self._anime_results = {result.title: result for result in results} self._anime_results = {result.title: result for result in results}
self._season_cache.clear() self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear() self._episode_label_cache.clear()
return [result.title for result in results] return [result.title for result in results]
def _ensure_seasons(self, title: str) -> List[SeasonInfo]: def _ensure_seasons(self, title: str) -> List[SeasonInfo]:
if title in self._season_cache: if title in self._season_cache:
return self._season_cache[title] return self._season_cache[title]
anime = self._find_series_by_title(title) seasons = self._ensure_season_links(title)
if not anime:
return []
seasons = scrape_anime_detail(anime.url)
self._season_cache[title] = list(seasons) self._season_cache[title] = list(seasons)
return list(seasons) return list(seasons)
def seasons_for(self, title: str) -> List[str]: def seasons_for(self, title: str) -> List[str]:
seasons = self._ensure_seasons(title) seasons = self._ensure_seasons(title)
return [self._season_label(season.number) for season in seasons if season.episodes] return [self._season_label(season.number) for season in seasons]
def episodes_for(self, title: str, season: str) -> List[str]: def episodes_for(self, title: str, season: str) -> List[str]:
seasons = self._ensure_seasons(title)
number = self._parse_season_number(season) number = self._parse_season_number(season)
if number is None: if number is None:
return [] return []
for season_info in seasons: season_info = self._ensure_season_episodes(title, number)
if season_info.number == number: if season_info:
labels = [self._episode_label(info) for info in season_info.episodes] labels = [self._episode_label(info) for info in season_info.episodes]
self._cache_episode_labels(title, season, season_info) self._cache_episode_labels(title, season, season_info)
return labels return labels

View File

@@ -10,9 +10,13 @@ from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from html import unescape
import json
import hashlib import hashlib
import os import os
import re import re
import time
import unicodedata
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency try: # pragma: no cover - optional dependency
@@ -68,6 +72,9 @@ HEADERS = {
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive", "Connection": "keep-alive",
} }
SESSION_CACHE_TTL_SECONDS = 300
SESSION_CACHE_PREFIX = "viewit.serienstream"
SESSION_CACHE_MAX_TITLE_URLS = 800
@dataclass @dataclass
@@ -127,6 +134,68 @@ def _absolute_url(href: str) -> str:
return f"{_get_base_url()}{href}" if href.startswith("/") else href return f"{_get_base_url()}{href}" if href.startswith("/") else href
def _session_window() -> Any:
if xbmcgui is None:
return None
try:
return xbmcgui.Window(10000)
except Exception:
return None
def _session_cache_key(name: str) -> str:
base_hash = hashlib.sha1(_get_base_url().encode("utf-8")).hexdigest()[:12]
return f"{SESSION_CACHE_PREFIX}.{base_hash}.{name}"
def _session_cache_get(name: str) -> Any:
window = _session_window()
if window is None:
return None
raw = ""
try:
raw = window.getProperty(_session_cache_key(name)) or ""
except Exception:
return None
if not raw:
return None
try:
payload = json.loads(raw)
except Exception:
return None
if not isinstance(payload, dict):
return None
expires_at = payload.get("expires_at")
data = payload.get("data")
try:
if float(expires_at or 0) <= time.time():
return None
except Exception:
return None
return data
def _session_cache_set(name: str, data: Any, *, ttl_seconds: int = SESSION_CACHE_TTL_SECONDS) -> None:
window = _session_window()
if window is None:
return
payload = {
"expires_at": float(time.time() + max(1, int(ttl_seconds))),
"data": data,
}
try:
raw = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
except Exception:
return
# Kodi-Properties sind kein Dauer-Storage; begrenzen, damit UI stabil bleibt.
if len(raw) > 240_000:
return
try:
window.setProperty(_session_cache_key(name), raw)
except Exception:
return
def _normalize_series_url(identifier: str) -> str: def _normalize_series_url(identifier: str) -> str:
if identifier.startswith("http://") or identifier.startswith("https://"): if identifier.startswith("http://") or identifier.startswith("https://"):
return identifier.rstrip("/") return identifier.rstrip("/")
@@ -279,7 +348,7 @@ def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> Beautif
return BeautifulSoup(response.text, "html.parser") return BeautifulSoup(response.text, "html.parser")
def _get_soup_simple(url: str) -> BeautifulSoupT: def _get_html_simple(url: str) -> str:
_ensure_requests() _ensure_requests()
_log_visit(url) _log_visit(url)
sess = get_requests_session("serienstream", headers=HEADERS) sess = get_requests_session("serienstream", headers=HEADERS)
@@ -291,10 +360,36 @@ def _get_soup_simple(url: str) -> BeautifulSoupT:
raise raise
if response.url and response.url != url: if response.url and response.url != url:
_log_url(response.url, kind="REDIRECT") _log_url(response.url, kind="REDIRECT")
_log_response_html(url, response.text) body = response.text
if _looks_like_cloudflare_challenge(response.text): _log_response_html(url, body)
if _looks_like_cloudflare_challenge(body):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return BeautifulSoup(response.text, "html.parser") return body
def _get_soup_simple(url: str) -> BeautifulSoupT:
body = _get_html_simple(url)
return BeautifulSoup(body, "html.parser")
def _extract_genre_names_from_html(body: str) -> List[str]:
names: List[str] = []
seen: set[str] = set()
pattern = re.compile(
r"<div[^>]*class=[\"'][^\"']*background-1[^\"']*[\"'][^>]*>.*?<h3[^>]*>(.*?)</h3>",
re.IGNORECASE | re.DOTALL,
)
for match in pattern.finditer(body or ""):
text = re.sub(r"<[^>]+>", " ", match.group(1) or "")
text = unescape(re.sub(r"\s+", " ", text)).strip()
if not text:
continue
key = text.casefold()
if key in seen:
continue
seen.add(key)
names.append(text)
return names
def search_series(query: str) -> List[SeriesResult]: def search_series(query: str) -> List[SeriesResult]:
@@ -584,10 +679,10 @@ def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]:
episode_text = (anchor.select_one(".ep-episode").get_text(strip=True) if anchor.select_one(".ep-episode") else "").strip() episode_text = (anchor.select_one(".ep-episode").get_text(strip=True) if anchor.select_one(".ep-episode") else "").strip()
season_number: Optional[int] = None season_number: Optional[int] = None
episode_number: Optional[int] = None episode_number: Optional[int] = None
match = re.search(r"S\\s*(\\d+)", season_text, re.IGNORECASE) match = re.search(r"S\s*(\d+)", season_text, re.IGNORECASE)
if match: if match:
season_number = int(match.group(1)) season_number = int(match.group(1))
match = re.search(r"E\\s*(\\d+)", episode_text, re.IGNORECASE) match = re.search(r"E\s*(\d+)", episode_text, re.IGNORECASE)
if match: if match:
episode_number = int(match.group(1)) episode_number = int(match.group(1))
if season_number is None or episode_number is None: if season_number is None or episode_number is None:
@@ -644,17 +739,14 @@ def resolve_redirect(target_url: str) -> Optional[str]:
def scrape_series_detail( def scrape_series_detail(
series_identifier: str, series_identifier: str,
max_seasons: Optional[int] = None, max_seasons: Optional[int] = None,
*,
load_episodes: bool = True,
) -> List[SeasonInfo]: ) -> List[SeasonInfo]:
_ensure_requests() _ensure_requests()
series_url = _series_root_url(_normalize_series_url(series_identifier)) series_url = _series_root_url(_normalize_series_url(series_identifier))
_log_url(series_url, kind="SERIES") _log_url(series_url, kind="SERIES")
_notify_url(series_url) _notify_url(series_url)
session = get_requests_session("serienstream", headers=HEADERS) session = get_requests_session("serienstream", headers=HEADERS)
# Preflight ist optional; manche Umgebungen/Provider leiten die Startseite um.
try:
_get_soup(_get_base_url(), session=session)
except Exception:
pass
soup = _get_soup(series_url, session=session) soup = _get_soup(series_url, session=session)
base_series_url = _series_root_url(_extract_canonical_url(soup, series_url)) base_series_url = _series_root_url(_extract_canonical_url(soup, series_url))
@@ -673,6 +765,8 @@ def scrape_series_detail(
season_links = season_links[:max_seasons] season_links = season_links[:max_seasons]
seasons: List[SeasonInfo] = [] seasons: List[SeasonInfo] = []
for number, url in season_links: for number, url in season_links:
episodes: List[EpisodeInfo] = []
if load_episodes:
season_soup = _get_soup(url, session=session) season_soup = _get_soup(url, session=session)
episodes = _extract_episodes(season_soup) episodes = _extract_episodes(season_soup)
seasons.append(SeasonInfo(number=number, url=url, episodes=episodes)) seasons.append(SeasonInfo(number=number, url=url, episodes=episodes))
@@ -688,9 +782,15 @@ class SerienstreamPlugin(BasisPlugin):
def __init__(self) -> None: def __init__(self) -> None:
self._series_results: Dict[str, SeriesResult] = {} self._series_results: Dict[str, SeriesResult] = {}
self._title_url_cache: Dict[str, str] = self._load_title_url_cache()
self._genre_names_cache: Optional[List[str]] = None
self._season_cache: Dict[str, List[SeasonInfo]] = {} self._season_cache: Dict[str, List[SeasonInfo]] = {}
self._season_links_cache: Dict[str, List[SeasonInfo]] = {}
self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {} self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {}
self._catalog_cache: Optional[Dict[str, List[SeriesResult]]] = None self._catalog_cache: Optional[Dict[str, List[SeriesResult]]] = None
self._genre_group_cache: Dict[str, Dict[str, List[str]]] = {}
self._genre_page_titles_cache: Dict[Tuple[str, int], List[str]] = {}
self._genre_page_count_cache: Dict[str, int] = {}
self._popular_cache: Optional[List[SeriesResult]] = None self._popular_cache: Optional[List[SeriesResult]] = None
self._requests_available = REQUESTS_AVAILABLE self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
@@ -713,6 +813,132 @@ class SerienstreamPlugin(BasisPlugin):
print(f"Importfehler: {REQUESTS_IMPORT_ERROR}") print(f"Importfehler: {REQUESTS_IMPORT_ERROR}")
return return
def _load_title_url_cache(self) -> Dict[str, str]:
raw = _session_cache_get("title_urls")
if not isinstance(raw, dict):
return {}
result: Dict[str, str] = {}
for key, value in raw.items():
key_text = str(key or "").strip().casefold()
url_text = str(value or "").strip()
if not key_text or not url_text:
continue
result[key_text] = url_text
return result
def _save_title_url_cache(self) -> None:
if not self._title_url_cache:
return
# Begrenzt die Session-Daten auf die jüngsten Einträge.
while len(self._title_url_cache) > SESSION_CACHE_MAX_TITLE_URLS:
self._title_url_cache.pop(next(iter(self._title_url_cache)))
_session_cache_set("title_urls", self._title_url_cache)
def _remember_series_result(self, title: str, url: str, description: str = "") -> None:
title = (title or "").strip()
url = (url or "").strip()
if not title:
return
if url:
self._series_results[title] = SeriesResult(title=title, description=description, url=url)
cache_key = title.casefold()
if self._title_url_cache.get(cache_key) != url:
self._title_url_cache[cache_key] = url
self._save_title_url_cache()
return
current = self._series_results.get(title)
if current is None:
self._series_results[title] = SeriesResult(title=title, description=description, url="")
@staticmethod
def _season_links_cache_name(series_url: str) -> str:
digest = hashlib.sha1((series_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_links.{digest}"
@staticmethod
def _season_episodes_cache_name(season_url: str) -> str:
digest = hashlib.sha1((season_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_episodes.{digest}"
def _load_session_season_links(self, series_url: str) -> Optional[List[SeasonInfo]]:
raw = _session_cache_get(self._season_links_cache_name(series_url))
if not isinstance(raw, list):
return None
seasons: List[SeasonInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
url = str(item.get("url") or "").strip()
if number <= 0 or not url:
continue
seasons.append(SeasonInfo(number=number, url=url, episodes=[]))
if not seasons:
return None
seasons.sort(key=lambda s: s.number)
return seasons
def _save_session_season_links(self, series_url: str, seasons: List[SeasonInfo]) -> None:
payload = [{"number": int(season.number), "url": season.url} for season in seasons if season.url]
if payload:
_session_cache_set(self._season_links_cache_name(series_url), payload)
def _load_session_season_episodes(self, season_url: str) -> Optional[List[EpisodeInfo]]:
raw = _session_cache_get(self._season_episodes_cache_name(season_url))
if not isinstance(raw, list):
return None
episodes: List[EpisodeInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
title = str(item.get("title") or "").strip()
original_title = str(item.get("original_title") or "").strip()
url = str(item.get("url") or "").strip()
season_label = str(item.get("season_label") or "").strip()
languages = [str(lang).strip() for lang in list(item.get("languages") or []) if str(lang).strip()]
hosters = [str(host).strip() for host in list(item.get("hosters") or []) if str(host).strip()]
if number <= 0:
continue
episodes.append(
EpisodeInfo(
number=number,
title=title or f"Episode {number}",
original_title=original_title,
url=url,
season_label=season_label,
languages=languages,
hosters=hosters,
)
)
if not episodes:
return None
episodes.sort(key=lambda item: item.number)
return episodes
def _save_session_season_episodes(self, season_url: str, episodes: List[EpisodeInfo]) -> None:
payload = []
for item in episodes:
payload.append(
{
"number": int(item.number),
"title": item.title,
"original_title": item.original_title,
"url": item.url,
"season_label": item.season_label,
"languages": list(item.languages or []),
"hosters": list(item.hosters or []),
}
)
if payload:
_session_cache_set(self._season_episodes_cache_name(season_url), payload)
def _ensure_catalog(self) -> Dict[str, List[SeriesResult]]: def _ensure_catalog(self) -> Dict[str, List[SeriesResult]]:
if self._catalog_cache is not None: if self._catalog_cache is not None:
return self._catalog_cache return self._catalog_cache
@@ -720,14 +946,38 @@ class SerienstreamPlugin(BasisPlugin):
catalog_url = f"{_get_base_url()}/serien?by=genre" catalog_url = f"{_get_base_url()}/serien?by=genre"
soup = _get_soup_simple(catalog_url) soup = _get_soup_simple(catalog_url)
self._catalog_cache = parse_series_catalog(soup) self._catalog_cache = parse_series_catalog(soup)
_session_cache_set("genres", sorted(self._catalog_cache.keys(), key=str.casefold))
return self._catalog_cache return self._catalog_cache
def _ensure_genre_names(self) -> List[str]:
if self._genre_names_cache is not None:
return list(self._genre_names_cache)
cached = _session_cache_get("genres")
if isinstance(cached, list):
genres = [str(value).strip() for value in cached if str(value).strip()]
if genres:
self._genre_names_cache = sorted(set(genres), key=str.casefold)
return list(self._genre_names_cache)
catalog_url = f"{_get_base_url()}/serien?by=genre"
try:
body = _get_html_simple(catalog_url)
genres = _extract_genre_names_from_html(body)
except Exception:
genres = []
if not genres:
catalog = self._ensure_catalog()
genres = sorted(catalog.keys(), key=str.casefold)
else:
genres = sorted(set(genres), key=str.casefold)
self._genre_names_cache = list(genres)
_session_cache_set("genres", self._genre_names_cache)
return list(self._genre_names_cache)
def genres(self) -> List[str]: def genres(self) -> List[str]:
"""Optional: Liefert alle Genres aus dem Serien-Katalog.""" """Optional: Liefert alle Genres aus dem Serien-Katalog."""
if not self._requests_available: if not self._requests_available:
return [] return []
catalog = self._ensure_catalog() return self._ensure_genre_names()
return sorted(catalog.keys(), key=str.casefold)
def capabilities(self) -> set[str]: def capabilities(self) -> set[str]:
"""Meldet unterstützte Features für Router-Menüs.""" """Meldet unterstützte Features für Router-Menüs."""
@@ -738,7 +988,8 @@ class SerienstreamPlugin(BasisPlugin):
if not self._requests_available: if not self._requests_available:
return [] return []
entries = self._ensure_popular() entries = self._ensure_popular()
self._series_results.update({entry.title: entry for entry in entries if entry.title}) for entry in entries:
self._remember_series_result(entry.title, entry.url, entry.description)
return [entry.title for entry in entries if entry.title] return [entry.title for entry in entries if entry.title]
def titles_for_genre(self, genre: str) -> List[str]: def titles_for_genre(self, genre: str) -> List[str]:
@@ -752,9 +1003,167 @@ class SerienstreamPlugin(BasisPlugin):
return self.popular_series() return self.popular_series()
catalog = self._ensure_catalog() catalog = self._ensure_catalog()
entries = catalog.get(genre, []) entries = catalog.get(genre, [])
self._series_results.update({entry.title: entry for entry in entries if entry.title}) for entry in entries:
self._remember_series_result(entry.title, entry.url, entry.description)
return [entry.title for entry in entries if entry.title] return [entry.title for entry in entries if entry.title]
@staticmethod
def _title_group_key(title: str) -> str:
raw = (title or "").strip()
if not raw:
return "#"
for char in raw:
if char.isdigit():
return "0-9"
if char.isalpha():
normalized = char.casefold()
if normalized == "ä":
normalized = "a"
elif normalized == "ö":
normalized = "o"
elif normalized == "ü":
normalized = "u"
elif normalized == "ß":
normalized = "s"
return normalized.upper()
return "#"
@classmethod
def _group_matches(cls, group_code: str, title: str) -> bool:
key = cls._title_group_key(title)
if group_code == "0-9":
return key == "0-9"
if key == "0-9" or key == "#":
return False
if group_code == "A-E":
return "A" <= key <= "E"
if group_code == "F-J":
return "F" <= key <= "J"
if group_code == "K-O":
return "K" <= key <= "O"
if group_code == "P-T":
return "P" <= key <= "T"
if group_code == "U-Z":
return "U" <= key <= "Z"
return False
def _ensure_genre_group_cache(self, genre: str) -> Dict[str, List[str]]:
cached = self._genre_group_cache.get(genre)
if cached is not None:
return cached
titles = self.titles_for_genre(genre)
grouped: Dict[str, List[str]] = {}
for title in titles:
for code in ("A-E", "F-J", "K-O", "P-T", "U-Z", "0-9"):
if self._group_matches(code, title):
grouped.setdefault(code, []).append(title)
break
for code in grouped:
grouped[code].sort(key=str.casefold)
self._genre_group_cache[genre] = grouped
return grouped
@staticmethod
def _genre_slug(genre: str) -> str:
value = (genre or "").strip().casefold()
value = value.replace("&", " und ")
value = unicodedata.normalize("NFKD", value)
value = "".join(ch for ch in value if not unicodedata.combining(ch))
value = re.sub(r"[^a-z0-9]+", "-", value).strip("-")
return value
def _fetch_genre_page_titles(self, genre: str, page: int) -> Tuple[List[str], int]:
slug = self._genre_slug(genre)
if not slug:
return [], 1
cache_key = (slug, page)
cached = self._genre_page_titles_cache.get(cache_key)
cached_pages = self._genre_page_count_cache.get(slug)
if cached is not None and cached_pages is not None:
return list(cached), int(cached_pages)
url = f"{_get_base_url()}/genre/{slug}"
if page > 1:
url = f"{url}?page={int(page)}"
soup = _get_soup_simple(url)
titles: List[str] = []
seen: set[str] = set()
for anchor in soup.select("a.show-card[href]"):
href = (anchor.get("href") or "").strip()
series_url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if "/serie/" not in series_url:
continue
img = anchor.select_one("img[alt]")
title = ((img.get("alt") if img else "") or "").strip()
if not title:
continue
key = title.casefold()
if key in seen:
continue
seen.add(key)
self._remember_series_result(title, series_url)
titles.append(title)
max_page = 1
for anchor in soup.select("a[href*='?page=']"):
href = (anchor.get("href") or "").strip()
match = re.search(r"[?&]page=(\d+)", href)
if not match:
continue
try:
max_page = max(max_page, int(match.group(1)))
except Exception:
continue
self._genre_page_titles_cache[cache_key] = list(titles)
self._genre_page_count_cache[slug] = max_page
return list(titles), max_page
def titles_for_genre_group_page(self, genre: str, group_code: str, page: int = 1, page_size: int = 10) -> List[str]:
genre = (genre or "").strip()
group_code = (group_code or "").strip()
page = max(1, int(page or 1))
page_size = max(1, int(page_size or 10))
needed = page * page_size + 1
matched: List[str] = []
try:
_, max_pages = self._fetch_genre_page_titles(genre, 1)
for page_index in range(1, max_pages + 1):
page_titles, _ = self._fetch_genre_page_titles(genre, page_index)
for title in page_titles:
if self._group_matches(group_code, title):
matched.append(title)
if len(matched) >= needed:
break
start = (page - 1) * page_size
end = start + page_size
return list(matched[start:end])
except Exception:
grouped = self._ensure_genre_group_cache(genre)
titles = grouped.get(group_code, [])
start = (page - 1) * page_size
end = start + page_size
return list(titles[start:end])
def genre_group_has_more(self, genre: str, group_code: str, page: int = 1, page_size: int = 10) -> bool:
genre = (genre or "").strip()
group_code = (group_code or "").strip()
page = max(1, int(page or 1))
page_size = max(1, int(page_size or 10))
needed = page * page_size + 1
count = 0
try:
_, max_pages = self._fetch_genre_page_titles(genre, 1)
for page_index in range(1, max_pages + 1):
page_titles, _ = self._fetch_genre_page_titles(genre, page_index)
for title in page_titles:
if self._group_matches(group_code, title):
count += 1
if count >= needed:
return True
return False
except Exception:
grouped = self._ensure_genre_group_cache(genre)
titles = grouped.get(group_code, [])
return len(titles) > (page * page_size)
def _ensure_popular(self) -> List[SeriesResult]: def _ensure_popular(self) -> List[SeriesResult]:
"""Laedt und cached die Liste der beliebten Serien aus `/beliebte-serien`.""" """Laedt und cached die Liste der beliebten Serien aus `/beliebte-serien`."""
if self._popular_cache is not None: if self._popular_cache is not None:
@@ -784,7 +1193,7 @@ class SerienstreamPlugin(BasisPlugin):
if not title or title in seen: if not title or title in seen:
continue continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/") url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
url = re.sub(r"/staffel-\\d+(?:/.*)?$", "", url).rstrip("/") url = re.sub(r"/staffel-\d+(?:/.*)?$", "", url).rstrip("/")
if not url: if not url:
continue continue
_log_parsed_url(url) _log_parsed_url(url)
@@ -830,19 +1239,104 @@ class SerienstreamPlugin(BasisPlugin):
self._episode_label(info): info for info in season_info.episodes self._episode_label(info): info for info in season_info.episodes
} }
def _ensure_season_links(self, title: str) -> List[SeasonInfo]:
cached = self._season_links_cache.get(title)
if cached is not None:
return list(cached)
series = self._series_results.get(title)
if not series:
cached_url = self._title_url_cache.get(title.casefold().strip(), "")
if cached_url:
series = SeriesResult(title=title, description="", url=cached_url)
self._series_results[title] = series
if not series:
catalog = self._ensure_catalog()
lookup_key = title.casefold().strip()
for entries in catalog.values():
for entry in entries:
if entry.title.casefold().strip() == lookup_key:
series = entry
self._remember_series_result(entry.title, entry.url, entry.description)
break
if series:
break
if not series:
return []
session_links = self._load_session_season_links(series.url)
if session_links:
self._season_links_cache[title] = list(session_links)
return list(session_links)
try:
seasons = scrape_series_detail(series.url, load_episodes=False)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc
self._season_links_cache[title] = list(seasons)
self._save_session_season_links(series.url, seasons)
return list(seasons)
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._remember_series_result(title, series_url)
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._series_results.get(title)
if direct and direct.url:
return direct.url
cached_url = self._title_url_cache.get(title.casefold().strip(), "")
if cached_url:
return cached_url
lookup_key = title.casefold().strip()
for entry in self._series_results.values():
if entry.title.casefold().strip() == lookup_key and entry.url:
return entry.url
return ""
def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]:
seasons = self._season_cache.get(title) or []
for season in seasons:
if season.number == season_number and season.episodes:
return season
links = self._ensure_season_links(title)
target = next((season for season in links if season.number == season_number), None)
if not target:
return None
cached_episodes = self._load_session_season_episodes(target.url)
if cached_episodes:
season_info = SeasonInfo(number=target.number, url=target.url, episodes=list(cached_episodes))
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
return season_info
try:
season_soup = _get_soup(target.url, session=get_requests_session("serienstream", headers=HEADERS))
season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup))
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Episoden konnten nicht geladen werden: {exc}") from exc
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
self._save_session_season_episodes(target.url, season_info.episodes)
return season_info
def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]: def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]:
cache_key = (title, season_label) cache_key = (title, season_label)
cached = self._episode_label_cache.get(cache_key) cached = self._episode_label_cache.get(cache_key)
if cached: if cached:
return cached.get(episode_label) return cached.get(episode_label)
seasons = self._ensure_seasons(title)
number = self._parse_season_number(season_label) number = self._parse_season_number(season_label)
if number is None: if number is None:
return None return None
season_info = self._ensure_season_episodes(title, number)
for season_info in seasons: if season_info:
if season_info.number == number:
self._cache_episode_labels(title, season_label, season_info) self._cache_episode_labels(title, season_label, season_info)
return self._episode_label_cache.get(cache_key, {}).get(episode_label) return self._episode_label_cache.get(cache_key, {}).get(episode_label)
return None return None
@@ -852,6 +1346,7 @@ class SerienstreamPlugin(BasisPlugin):
if not query: if not query:
self._series_results.clear() self._series_results.clear()
self._season_cache.clear() self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear() self._episode_label_cache.clear()
self._catalog_cache = None self._catalog_cache = None
return [] return []
@@ -867,8 +1362,11 @@ class SerienstreamPlugin(BasisPlugin):
self._episode_label_cache.clear() self._episode_label_cache.clear()
self._catalog_cache = None self._catalog_cache = None
raise RuntimeError(f"Serienstream-Suche fehlgeschlagen: {exc}") from exc raise RuntimeError(f"Serienstream-Suche fehlgeschlagen: {exc}") from exc
self._series_results = {result.title: result for result in results} self._series_results = {}
for result in results:
self._remember_series_result(result.title, result.url, result.description)
self._season_cache.clear() self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear() self._episode_label_cache.clear()
return [result.title for result in results] return [result.title for result in results]
@@ -895,33 +1393,27 @@ class SerienstreamPlugin(BasisPlugin):
for entry in entries: for entry in entries:
if entry.title.casefold().strip() == lookup_key: if entry.title.casefold().strip() == lookup_key:
series = entry series = entry
self._series_results[entry.title] = entry self._remember_series_result(entry.title, entry.url, entry.description)
break break
if series: if series:
break break
if not series: if not series:
return [] return []
try: seasons = self._ensure_season_links(title)
seasons = scrape_series_detail(series.url)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc
self._clear_episode_cache_for_title(title) self._clear_episode_cache_for_title(title)
self._season_cache[title] = seasons self._season_cache[title] = list(seasons)
return seasons return list(seasons)
def seasons_for(self, title: str) -> List[str]: def seasons_for(self, title: str) -> List[str]:
seasons = self._ensure_seasons(title) seasons = self._ensure_seasons(title)
# Serienstream liefert gelegentlich Staffeln ohne Episoden (z.B. Parsing-/Layoutwechsel). return [self._season_label(season.number) for season in seasons]
# Diese sollen im UI nicht als auswählbarer Menüpunkt erscheinen.
return [self._season_label(season.number) for season in seasons if season.episodes]
def episodes_for(self, title: str, season: str) -> List[str]: def episodes_for(self, title: str, season: str) -> List[str]:
seasons = self._ensure_seasons(title)
number = self._parse_season_number(season) number = self._parse_season_number(season)
if number is None: if number is None:
return [] return []
for season_info in seasons: season_info = self._ensure_season_episodes(title, number)
if season_info.number == number: if season_info:
labels = [self._episode_label(info) for info in season_info.episodes] labels = [self._episode_label(info) for info in season_info.episodes]
self._cache_episode_labels(title, season, season_info) self._cache_episode_labels(title, season, season_info)
return labels return labels