Compare commits

..

3 Commits

9 changed files with 890 additions and 310 deletions

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.52" provider-name="ViewIt">
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.56" provider-name="ViewIt">
<requires>
<import addon="xbmc.python" version="3.0.0" />
<import addon="script.module.requests" />

View File

@@ -16,11 +16,26 @@ import json
import os
import re
import sys
import threading
import xml.etree.ElementTree as ET
from pathlib import Path
from types import ModuleType
from urllib.parse import parse_qs, urlencode
def _ensure_windows_selector_policy() -> None:
"""Erzwingt unter Windows einen Selector-Loop (thread-kompatibel in Kodi)."""
if not sys.platform.startswith("win"):
return
try:
current = asyncio.get_event_loop_policy()
if current.__class__.__name__ == "WindowsSelectorEventLoopPolicy":
return
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except Exception:
# Fallback: Wenn die Policy nicht verfügbar ist, arbeitet der Code mit Default-Policy weiter.
return
try: # pragma: no cover - Kodi runtime
import xbmc # type: ignore[import-not-found]
import xbmcaddon # type: ignore[import-not-found]
@@ -402,6 +417,81 @@ def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return default
def _get_setting_int(setting_id: str, *, default: int = 0) -> int:
if xbmcaddon is None:
return default
addon = _get_addon()
if addon is None:
return default
getter = getattr(addon, "getSettingInt", None)
if callable(getter):
raw_getter = getattr(addon, "getSetting", None)
if callable(raw_getter):
try:
raw = str(raw_getter(setting_id) or "").strip()
except TypeError:
raw = ""
if raw == "":
return default
try:
return int(getter(setting_id))
except TypeError:
return default
getter = getattr(addon, "getSetting", None)
if callable(getter):
try:
raw = str(getter(setting_id) or "").strip()
except TypeError:
return default
if raw == "":
return default
try:
return int(raw)
except ValueError:
return default
return default
METADATA_MODE_AUTO = 0
METADATA_MODE_SOURCE = 1
METADATA_MODE_TMDB = 2
METADATA_MODE_MIX = 3
def _metadata_setting_id(plugin_name: str) -> str:
safe = re.sub(r"[^a-z0-9]+", "_", (plugin_name or "").strip().casefold()).strip("_")
return f"{safe}_metadata_source" if safe else "metadata_source"
def _plugin_supports_metadata(plugin: BasisPlugin) -> bool:
try:
return plugin.__class__.metadata_for is not BasisPlugin.metadata_for
except Exception:
return False
def _metadata_policy(
plugin_name: str,
plugin: BasisPlugin,
*,
allow_tmdb: bool,
) -> tuple[bool, bool, bool]:
mode = _get_setting_int(_metadata_setting_id(plugin_name), default=METADATA_MODE_AUTO)
supports_source = _plugin_supports_metadata(plugin)
if mode == METADATA_MODE_SOURCE:
return supports_source, False, True
if mode == METADATA_MODE_TMDB:
return False, allow_tmdb, False
if mode == METADATA_MODE_MIX:
return supports_source, allow_tmdb, True
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
return supports_source, allow_tmdb, prefer_source
def _tmdb_list_enabled() -> bool:
return _tmdb_enabled() and _get_setting_bool("tmdb_genre_metadata", default=False)
def _set_setting_string(setting_id: str, value: str) -> None:
if xbmcaddon is None:
return
@@ -1120,18 +1210,28 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
try:
with _progress_dialog("Suche läuft", f"{plugin_name} (1/1) starte…") as progress:
canceled = progress(5, f"{plugin_name} (1/1) Suche…")
results = _run_async(plugin.search_titles(query))
search_coro = plugin.search_titles(query)
try:
results = _run_async(search_coro)
except Exception:
if inspect.iscoroutine(search_coro):
try:
search_coro.close()
except Exception:
pass
raise
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
results.sort(key=lambda value: value.casefold())
plugin_meta = _collect_plugin_metadata(plugin, results)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, results) if use_source else {}
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = list(results) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
@@ -1271,17 +1371,34 @@ def _discover_plugins() -> dict[str, BasisPlugin]:
def _run_async(coro):
"""Fuehrt eine Coroutine aus, auch wenn Kodi bereits einen Event-Loop hat."""
_ensure_windows_selector_policy()
def _run_with_asyncio_run():
return asyncio.run(coro)
try:
loop = asyncio.get_event_loop()
running_loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
temp_loop = asyncio.new_event_loop()
try:
return temp_loop.run_until_complete(coro)
finally:
temp_loop.close()
return asyncio.run(coro)
running_loop = None
if running_loop and running_loop.is_running():
result_box: dict[str, object] = {}
error_box: dict[str, BaseException] = {}
def _worker() -> None:
try:
result_box["value"] = _run_with_asyncio_run()
except BaseException as exc: # pragma: no cover - defensive
error_box["error"] = exc
worker = threading.Thread(target=_worker, name="viewit-async-runner")
worker.start()
worker.join()
if "error" in error_box:
raise error_box["error"]
return result_box.get("value")
return _run_with_asyncio_run()
def _series_url_params(plugin: BasisPlugin, title: str) -> dict[str, str]:
@@ -1327,21 +1444,28 @@ def _show_search_results(query: str) -> None:
canceled = progress(range_start, f"{plugin_name} ({plugin_index}/{total_plugins}) Suche…")
if canceled:
break
search_coro = plugin.search_titles(query)
try:
results = _run_async(plugin.search_titles(query))
results = _run_async(search_coro)
except Exception as exc:
if inspect.iscoroutine(search_coro):
try:
search_coro.close()
except Exception:
pass
_log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING)
continue
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
_log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG)
plugin_meta = _collect_plugin_metadata(plugin, results)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, results) if use_source else {}
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = list(results) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
@@ -1485,11 +1609,14 @@ def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None:
except Exception:
pass
use_source, show_tmdb, _prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
title_info_labels: dict[str, str] | None = None
title_art: dict[str, str] | None = None
title_cast: list[TmdbCastMember] | None = None
meta_getter = getattr(plugin, "metadata_for", None)
if callable(meta_getter):
if use_source and callable(meta_getter):
try:
with _busy_dialog():
meta_labels, meta_art, meta_cast = meta_getter(title)
@@ -1516,8 +1643,9 @@ def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None:
xbmcplugin.setPluginCategory(handle, f"{title} ({count} {suffix})")
_set_content(handle, "seasons")
# Staffel-Metadaten (Plot/Poster) optional via TMDB.
_tmdb_labels_and_art(title)
api_key = _get_setting_string("tmdb_api_key").strip()
if show_tmdb:
_tmdb_labels_and_art(title)
api_key = _get_setting_string("tmdb_api_key").strip() if show_tmdb else ""
language = _get_setting_string("tmdb_language").strip() or "de-DE"
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
@@ -1606,13 +1734,42 @@ def _show_episodes(plugin_name: str, title: str, season: str, series_url: str =
episodes = list(plugin.episodes_for(title, season))
if episodes:
show_info, show_art, show_cast = _tmdb_labels_and_art(title)
episode_url_getter = getattr(plugin, "episode_url_for", None)
supports_direct_episode_url = callable(getattr(plugin, "stream_link_for_url", None))
use_source, show_tmdb, _prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
show_info: dict[str, str] = {}
show_art: dict[str, str] = {}
show_cast: list[TmdbCastMember] | None = None
if show_tmdb:
show_info, show_art, show_cast = _tmdb_labels_and_art(title)
elif use_source:
meta_getter = getattr(plugin, "metadata_for", None)
if callable(meta_getter):
try:
with _busy_dialog():
meta_labels, meta_art, meta_cast = meta_getter(title)
if isinstance(meta_labels, dict):
show_info = {str(k): str(v) for k, v in meta_labels.items() if v}
if isinstance(meta_art, dict):
show_art = {str(k): str(v) for k, v in meta_art.items() if v}
if isinstance(meta_cast, list):
show_cast = meta_cast # noqa: PGH003
except Exception:
pass
show_fanart = (show_art or {}).get("fanart") if isinstance(show_art, dict) else ""
show_poster = (show_art or {}).get("poster") if isinstance(show_art, dict) else ""
with _busy_dialog():
for episode in episodes:
info_labels, art = _tmdb_episode_labels_and_art(title=title, season_label=season, episode_label=episode)
episode_cast = _tmdb_episode_cast(title=title, season_label=season, episode_label=episode)
if show_tmdb:
info_labels, art = _tmdb_episode_labels_and_art(
title=title, season_label=season, episode_label=episode
)
episode_cast = _tmdb_episode_cast(title=title, season_label=season, episode_label=episode)
else:
info_labels, art, episode_cast = {}, {}, []
merged_info = dict(show_info or {})
merged_info.update(dict(info_labels or {}))
merged_art: dict[str, str] = {}
@@ -1642,11 +1799,25 @@ def _show_episodes(plugin_name: str, title: str, season: str, series_url: str =
merged_info = _apply_playstate_to_info(merged_info, _get_playstate(key))
display_label = episode
play_params = {
"plugin": plugin_name,
"title": title,
"season": season,
"episode": episode,
"series_url": series_url,
}
if supports_direct_episode_url and callable(episode_url_getter):
try:
episode_url = str(episode_url_getter(title, season, episode) or "").strip()
except Exception:
episode_url = ""
if episode_url:
play_params["url"] = episode_url
_add_directory_item(
handle,
display_label,
"play_episode",
{"plugin": plugin_name, "title": title, "season": season, "episode": episode},
play_params,
is_folder=False,
info_labels=merged_info,
art=merged_art,
@@ -1803,16 +1974,16 @@ def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = _tmdb_enabled()
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
@@ -1823,51 +1994,31 @@ def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
if show_tmdb:
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, {}, {}, None, meta)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info(info_labels, playstate),
art=art,
cast=cast,
)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
@@ -1939,16 +2090,16 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = show_tmdb and _tmdb_enabled()
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
@@ -2090,50 +2241,52 @@ def _show_alpha_titles_page(plugin_name: str, letter: str, page: int = 1) -> Non
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(titles)
for title in titles:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
playstate = _title_playstate(plugin_name, title)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
@@ -2200,42 +2353,48 @@ def _show_series_catalog(plugin_name: str, page: int = 1) -> None:
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(titles)
for title in titles:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
@@ -2429,16 +2588,16 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
end = start + page_size
page_items = titles[start:end]
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page_items:
plugin_meta = _collect_plugin_metadata(plugin, page_items)
show_tmdb = show_tmdb and _tmdb_enabled()
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items)
if show_tmdb and prefer_source:
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
meta = plugin_meta.get(title)
@@ -2450,7 +2609,7 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
@@ -2577,48 +2736,50 @@ def _show_new_titles(plugin_name: str, page: int = 1) -> None:
start = (page - 1) * page_size
end = start + page_size
page_items = titles[start:end]
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "movie")
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "movie")
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if callable(paging_getter) and callable(has_more_getter):
@@ -2738,7 +2899,9 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
return
xbmcplugin.setPluginCategory(handle, f"{genre} [{group_code}] ({page})")
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
if page > 1:
_add_directory_item(
handle,
@@ -2748,40 +2911,44 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
is_folder=True,
)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if callable(grouped_has_more):
try:
@@ -2827,43 +2994,49 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
start = (page - 1) * page_size
end = start + page_size
page_items = filtered[start:end]
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
if total_pages > 1 and page < total_pages:
_add_directory_item(
@@ -3045,8 +3218,34 @@ def _play_episode(
season: str,
episode: str,
*,
episode_url: str = "",
series_url: str = "",
resolve_handle: int | None = None,
) -> None:
episode_url = (episode_url or "").strip()
if episode_url:
_play_episode_url(
plugin_name,
title=title,
season_number=_extract_first_int(season) or 0,
episode_number=_extract_first_int(episode) or 0,
episode_url=episode_url,
season_label_override=season,
episode_label_override=episode,
resolve_handle=resolve_handle,
)
return
series_url = (series_url or "").strip()
if series_url:
plugin_for_url = _discover_plugins().get(plugin_name)
remember_series_url = getattr(plugin_for_url, "remember_series_url", None) if plugin_for_url is not None else None
if callable(remember_series_url):
try:
remember_series_url(title, series_url)
except Exception:
pass
_log(f"Play anfordern: {plugin_name} / {title} / {season} / {episode}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
@@ -3123,10 +3322,14 @@ def _play_episode_url(
season_number: int,
episode_number: int,
episode_url: str,
season_label_override: str = "",
episode_label_override: str = "",
resolve_handle: int | None = None,
) -> None:
season_label = f"Staffel {season_number}" if season_number > 0 else ""
episode_label = f"Episode {episode_number}" if episode_number > 0 else ""
season_label = (season_label_override or "").strip() or (f"Staffel {season_number}" if season_number > 0 else "")
episode_label = (episode_label_override or "").strip() or (
f"Episode {episode_number}" if episode_number > 0 else ""
)
_log(f"Play (URL) anfordern: {plugin_name} / {title} / {season_label} / {episode_label} / {episode_url}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
@@ -3295,6 +3498,8 @@ def run() -> None:
params.get("title", ""),
params.get("season", ""),
params.get("episode", ""),
episode_url=params.get("url", ""),
series_url=params.get("series_url", ""),
resolve_handle=_get_handle(),
)
elif action == "play_movie":

View File

@@ -9,7 +9,7 @@ Zum Verwenden:
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias
from typing import TYPE_CHECKING, Any, List, Optional
try: # pragma: no cover - optional dependency
import requests
@@ -34,8 +34,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"

View File

@@ -13,7 +13,7 @@ import hashlib
import json
import re
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
try: # pragma: no cover - optional dependency
import requests
@@ -43,8 +43,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
SETTING_BASE_URL = "aniworld_base_url"
@@ -1213,6 +1213,18 @@ class AniworldPlugin(BasisPlugin):
_log_url(link, kind="FOUND")
return link
def episode_url_for(self, title: str, season: str, episode: str) -> str:
cache_key = (title, season)
cached = self._episode_label_cache.get(cache_key)
if cached:
info = cached.get(episode)
if info and info.url:
return info.url
episode_info = self._lookup_episode(title, season, episode)
if episode_info and episode_info.url:
return episode_info.url
return ""
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Hoster laden.")

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional
try: # pragma: no cover - optional dependency
import requests
@@ -27,8 +27,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"

View File

@@ -11,7 +11,7 @@ from dataclasses import dataclass
import re
from urllib.parse import quote, urlencode
from urllib.parse import urljoin
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
try: # pragma: no cover - optional dependency
import requests
@@ -33,8 +33,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"
@@ -820,11 +820,23 @@ class FilmpalastPlugin(BasisPlugin):
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
hosters = self._hosters_for_detail_url(detail_url)
return list(hosters.keys())
return self.available_hosters_for_url(detail_url)
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
return self.stream_link_for_url(detail_url)
def episode_url_for(self, title: str, season: str, episode: str) -> str:
detail_url = self._detail_url_for_selection(title, season, episode)
return (detail_url or "").strip()
def available_hosters_for_url(self, episode_url: str) -> List[str]:
detail_url = (episode_url or "").strip()
hosters = self._hosters_for_detail_url(detail_url)
return list(hosters.keys())
def stream_link_for_url(self, episode_url: str) -> Optional[str]:
detail_url = (episode_url or "").strip()
if not detail_url:
return None
hosters = self._hosters_for_detail_url(detail_url)

View File

@@ -17,7 +17,8 @@ import os
import re
import time
import unicodedata
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from urllib.parse import quote
try: # pragma: no cover - optional dependency
import requests
@@ -49,14 +50,15 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
SETTING_BASE_URL = "serienstream_base_url"
DEFAULT_BASE_URL = "https://s.to"
DEFAULT_PREFERRED_HOSTERS = ["voe"]
DEFAULT_TIMEOUT = 20
SEARCH_TIMEOUT = 8
ADDON_ID = "plugin.video.viewit"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
@@ -75,6 +77,9 @@ HEADERS = {
SESSION_CACHE_TTL_SECONDS = 300
SESSION_CACHE_PREFIX = "viewit.serienstream"
SESSION_CACHE_MAX_TITLE_URLS = 800
CATALOG_SEARCH_TTL_SECONDS = 600
CATALOG_SEARCH_CACHE_KEY = "catalog_index"
_CATALOG_INDEX_MEMORY: tuple[float, List["SeriesResult"]] = (0.0, [])
@dataclass
@@ -111,6 +116,57 @@ class SeasonInfo:
episodes: List[EpisodeInfo]
def _extract_series_metadata(soup: BeautifulSoupT) -> Tuple[Dict[str, str], Dict[str, str]]:
info: Dict[str, str] = {}
art: Dict[str, str] = {}
if not soup:
return info, art
title_tag = soup.select_one("h1")
title = (title_tag.get_text(" ", strip=True) if title_tag else "").strip()
if title:
info["title"] = title
description = ""
desc_tag = soup.select_one(".series-description .description-text")
if desc_tag:
description = (desc_tag.get_text(" ", strip=True) or "").strip()
if not description:
meta_desc = soup.select_one("meta[property='og:description'], meta[name='description']")
if meta_desc:
description = (meta_desc.get("content") or "").strip()
if description:
info["plot"] = description
poster = ""
poster_tag = soup.select_one(
".show-cover-mobile img[data-src], .show-cover-mobile img[src], .col-3 img[data-src], .col-3 img[src]"
)
if poster_tag:
poster = (poster_tag.get("data-src") or poster_tag.get("src") or "").strip()
if not poster:
for candidate in soup.select("img[data-src], img[src]"):
url = (candidate.get("data-src") or candidate.get("src") or "").strip()
if "/media/images/channel/" in url:
poster = url
break
if poster:
poster = _absolute_url(poster)
art["poster"] = poster
art["thumb"] = poster
fanart = ""
fanart_tag = soup.select_one("meta[property='og:image']")
if fanart_tag:
fanart = (fanart_tag.get("content") or "").strip()
if fanart:
fanart = _absolute_url(fanart)
art["fanart"] = fanart
art["landscape"] = fanart
return info, art
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
@@ -400,20 +456,222 @@ def _extract_genre_names_from_html(body: str) -> List[str]:
return names
def _strip_tags(value: str) -> str:
return re.sub(r"<[^>]+>", " ", value or "")
def _search_series_api(query: str) -> List[SeriesResult]:
query = (query or "").strip()
if not query:
return []
_ensure_requests()
sess = get_requests_session("serienstream", headers=HEADERS)
terms = [query]
if " " in query:
# Fallback: einzelne Tokens liefern in der API oft bessere Treffer.
terms.extend([token for token in query.split() if token])
seen_urls: set[str] = set()
for term in terms:
try:
response = sess.get(
f"{_get_base_url()}/api/search/suggest",
params={"term": term},
headers=HEADERS,
timeout=SEARCH_TIMEOUT,
)
response.raise_for_status()
except Exception:
continue
try:
payload = response.json()
except Exception:
continue
shows = payload.get("shows") if isinstance(payload, dict) else None
if not isinstance(shows, list):
continue
results: List[SeriesResult] = []
for item in shows:
if not isinstance(item, dict):
continue
title = (item.get("name") or "").strip()
href = (item.get("url") or "").strip()
if not title or not href:
continue
url_abs = _absolute_url(href)
if not url_abs or url_abs in seen_urls:
continue
if "/staffel-" in url_abs or "/episode-" in url_abs:
continue
seen_urls.add(url_abs)
results.append(SeriesResult(title=title, description="", url=url_abs))
if not results:
continue
filtered = [entry for entry in results if _matches_query(query, title=entry.title)]
if filtered:
return filtered
# Falls nur Token-Suche möglich war, zumindest die Ergebnisse liefern.
if term != query:
return results
return []
def _search_series_server(query: str) -> List[SeriesResult]:
if not query:
return []
api_results = _search_series_api(query)
if api_results:
return api_results
base = _get_base_url()
search_url = f"{base}/search?q={quote(query)}"
alt_url = f"{base}/suche?q={quote(query)}"
for url in (search_url, alt_url):
try:
body = _get_html_simple(url)
except Exception:
continue
if not body:
continue
soup = BeautifulSoup(body, "html.parser")
root = soup.select_one(".search-results-list")
if root is None:
continue
seen_urls: set[str] = set()
results: List[SeriesResult] = []
for card in root.select(".cover-card"):
anchor = card.select_one("a[href*='/serie/']")
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url_abs = _absolute_url(href)
if not url_abs or url_abs in seen_urls:
continue
if "/staffel-" in url_abs or "/episode-" in url_abs:
continue
title_tag = card.select_one(".show-title") or card.select_one("h3") or card.select_one("h4")
title = (title_tag.get_text(" ", strip=True) if title_tag else anchor.get_text(" ", strip=True)).strip()
if not title:
continue
seen_urls.add(url_abs)
results.append(SeriesResult(title=title, description="", url=url_abs))
if results:
return results
return []
def _extract_catalog_index_from_html(body: str) -> List[SeriesResult]:
items: List[SeriesResult] = []
if not body:
return items
seen_urls: set[str] = set()
item_re = re.compile(
r"<li[^>]*class=[\"'][^\"']*series-item[^\"']*[\"'][^>]*>(.*?)</li>",
re.IGNORECASE | re.DOTALL,
)
anchor_re = re.compile(r"<a[^>]+href=[\"']([^\"']+)[\"'][^>]*>(.*?)</a>", re.IGNORECASE | re.DOTALL)
data_search_re = re.compile(r"data-search=[\"']([^\"']*)[\"']", re.IGNORECASE)
for match in item_re.finditer(body):
block = match.group(0)
inner = match.group(1) or ""
anchor_match = anchor_re.search(inner)
if not anchor_match:
continue
href = (anchor_match.group(1) or "").strip()
url = _absolute_url(href)
if not url or "/serie/" not in url or "/staffel-" in url or "/episode-" in url:
continue
if url in seen_urls:
continue
seen_urls.add(url)
title_raw = anchor_match.group(2) or ""
title = unescape(re.sub(r"\s+", " ", _strip_tags(title_raw))).strip()
if not title:
continue
search_match = data_search_re.search(block)
description = (search_match.group(1) or "").strip() if search_match else ""
items.append(SeriesResult(title=title, description=description, url=url))
return items
def _catalog_index_from_soup(soup: BeautifulSoupT) -> List[SeriesResult]:
items: List[SeriesResult] = []
if not soup:
return items
seen_urls: set[str] = set()
for item in soup.select("li.series-item"):
anchor = item.find("a", href=True)
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url = _absolute_url(href)
if not url or "/serie/" not in url or "/staffel-" in url or "/episode-" in url:
continue
if url in seen_urls:
continue
seen_urls.add(url)
title = (anchor.get_text(" ", strip=True) or "").strip()
if not title:
continue
description = (item.get("data-search") or "").strip()
items.append(SeriesResult(title=title, description=description, url=url))
return items
def _load_catalog_index_from_cache() -> Optional[List[SeriesResult]]:
global _CATALOG_INDEX_MEMORY
expires_at, cached = _CATALOG_INDEX_MEMORY
if cached and expires_at > time.time():
return list(cached)
raw = _session_cache_get(CATALOG_SEARCH_CACHE_KEY)
if not isinstance(raw, list):
return None
items: List[SeriesResult] = []
for entry in raw:
if not isinstance(entry, list) or len(entry) < 2:
continue
title = str(entry[0] or "").strip()
url = str(entry[1] or "").strip()
description = str(entry[2] or "") if len(entry) > 2 else ""
if title and url:
items.append(SeriesResult(title=title, description=description, url=url))
if items:
_CATALOG_INDEX_MEMORY = (time.time() + CATALOG_SEARCH_TTL_SECONDS, list(items))
return items or None
def _store_catalog_index_in_cache(items: List[SeriesResult]) -> None:
global _CATALOG_INDEX_MEMORY
if not items:
return
_CATALOG_INDEX_MEMORY = (time.time() + CATALOG_SEARCH_TTL_SECONDS, list(items))
payload: List[List[str]] = []
for entry in items:
if not entry.title or not entry.url:
continue
payload.append([entry.title, entry.url, entry.description])
_session_cache_set(CATALOG_SEARCH_CACHE_KEY, payload, ttl_seconds=CATALOG_SEARCH_TTL_SECONDS)
def search_series(query: str) -> List[SeriesResult]:
"""Sucht Serien im (/serien)-Katalog (Genre-liste) nach Titel/Alt-Titel."""
"""Sucht Serien im (/serien)-Katalog nach Titel. Nutzt Cache + Ein-Pass-Filter."""
_ensure_requests()
if not _normalize_search_text(query):
return []
# Direkter Abruf wie in fetch_serien.py.
server_results = _search_series_server(query)
if server_results:
return [entry for entry in server_results if entry.title and _matches_query(query, title=entry.title)]
cached = _load_catalog_index_from_cache()
if cached is not None:
return [entry for entry in cached if entry.title and _matches_query(query, title=entry.title)]
catalog_url = f"{_get_base_url()}/serien?by=genre"
soup = _get_soup_simple(catalog_url)
results: List[SeriesResult] = []
for series in parse_series_catalog(soup).values():
for entry in series:
if entry.title and _matches_query(query, title=entry.title):
results.append(entry)
return results
body = _get_html_simple(catalog_url)
items = _extract_catalog_index_from_html(body)
if not items:
soup = BeautifulSoup(body, "html.parser")
items = _catalog_index_from_soup(soup)
if items:
_store_catalog_index_in_cache(items)
return [entry for entry in items if entry.title and _matches_query(query, title=entry.title)]
def parse_series_catalog(soup: BeautifulSoupT) -> Dict[str, List[SeriesResult]]:
@@ -805,6 +1063,7 @@ class SerienstreamPlugin(BasisPlugin):
self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {}
self._latest_cache: Dict[int, List[LatestEpisode]] = {}
self._latest_hoster_cache: Dict[str, List[str]] = {}
self._series_metadata_cache: Dict[str, Tuple[Dict[str, str], Dict[str, str]]] = {}
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
@@ -851,12 +1110,30 @@ class SerienstreamPlugin(BasisPlugin):
cache_key = title.casefold()
if self._title_url_cache.get(cache_key) != url:
self._title_url_cache[cache_key] = url
self._save_title_url_cache()
self._save_title_url_cache()
if url:
return
current = self._series_results.get(title)
if current is None:
self._series_results[title] = SeriesResult(title=title, description=description, url="")
@staticmethod
def _metadata_cache_key(title: str) -> str:
return (title or "").strip().casefold()
def _series_for_title(self, title: str) -> Optional[SeriesResult]:
direct = self._series_results.get(title)
if direct and direct.url:
return direct
lookup_key = (title or "").strip().casefold()
for item in self._series_results.values():
if item.title.casefold().strip() == lookup_key and item.url:
return item
cached_url = self._title_url_cache.get(lookup_key, "")
if cached_url:
return SeriesResult(title=title, description="", url=cached_url)
return None
@staticmethod
def _season_links_cache_name(series_url: str) -> str:
digest = hashlib.sha1((series_url or "").encode("utf-8")).hexdigest()[:20]
@@ -1274,7 +1551,28 @@ class SerienstreamPlugin(BasisPlugin):
self._season_links_cache[title] = list(session_links)
return list(session_links)
try:
seasons = scrape_series_detail(series.url, load_episodes=False)
series_soup = _get_soup(series.url, session=get_requests_session("serienstream", headers=HEADERS))
info_labels, art = _extract_series_metadata(series_soup)
if series.description and "plot" not in info_labels:
info_labels["plot"] = series.description
cache_key = self._metadata_cache_key(title)
if info_labels or art:
self._series_metadata_cache[cache_key] = (info_labels, art)
base_series_url = _series_root_url(_extract_canonical_url(series_soup, series.url))
season_links = _extract_season_links(series_soup)
season_count = _extract_number_of_seasons(series_soup)
if season_count and (not season_links or len(season_links) < season_count):
existing = {number for number, _ in season_links}
for number in range(1, season_count + 1):
if number in existing:
continue
season_url = f"{base_series_url}/staffel-{number}"
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
seasons = [SeasonInfo(number=number, url=url, episodes=[]) for number, url in season_links]
seasons.sort(key=lambda s: s.number)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc
self._season_links_cache[title] = list(seasons)
@@ -1288,6 +1586,41 @@ class SerienstreamPlugin(BasisPlugin):
return
self._remember_series_result(title, series_url)
def metadata_for(self, title: str) -> Tuple[Dict[str, str], Dict[str, str], Optional[List[Any]]]:
title = (title or "").strip()
if not title or not self._requests_available:
return {}, {}, None
cache_key = self._metadata_cache_key(title)
cached = self._series_metadata_cache.get(cache_key)
if cached is not None:
info, art = cached
return dict(info), dict(art), None
series = self._series_for_title(title)
if series is None or not series.url:
info = {"title": title}
self._series_metadata_cache[cache_key] = (dict(info), {})
return info, {}, None
info: Dict[str, str] = {"title": title}
art: Dict[str, str] = {}
if series.description:
info["plot"] = series.description
try:
soup = _get_soup(series.url, session=get_requests_session("serienstream", headers=HEADERS))
parsed_info, parsed_art = _extract_series_metadata(soup)
if parsed_info:
info.update(parsed_info)
if parsed_art:
art.update(parsed_art)
except Exception:
pass
self._series_metadata_cache[cache_key] = (dict(info), dict(art))
return info, art, None
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
@@ -1443,6 +1776,18 @@ class SerienstreamPlugin(BasisPlugin):
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc
def episode_url_for(self, title: str, season: str, episode: str) -> str:
cache_key = (title, season)
cached = self._episode_label_cache.get(cache_key)
if cached:
info = cached.get(episode)
if info and info.url:
return info.url
episode_info = self._lookup_episode(title, season, episode)
if episode_info and episode_info.url:
return episode_info.url
return ""
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.")

View File

@@ -19,7 +19,7 @@ import hashlib
import os
import re
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlencode, urljoin
try: # pragma: no cover - optional dependency
@@ -51,8 +51,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"

View File

@@ -31,22 +31,28 @@
</category>
<category label="TopStream">
<setting id="topstream_base_url" type="text" label="Domain (BASE_URL)" default="https://topstreamfilm.live" />
<setting id="topstreamfilm_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
<setting id="topstream_genre_max_pages" type="number" label="Genres: max. Seiten laden (Pagination)" default="20" />
</category>
<category label="SerienStream">
<setting id="serienstream_base_url" type="text" label="Domain (BASE_URL)" default="https://s.to" />
<setting id="serienstream_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="AniWorld">
<setting id="aniworld_base_url" type="text" label="Domain (BASE_URL)" default="https://aniworld.to" />
<setting id="aniworld_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="Einschalten">
<setting id="einschalten_base_url" type="text" label="Domain (BASE_URL)" default="https://einschalten.in" />
<setting id="einschalten_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" />
<setting id="filmpalast_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="Doku-Streams">
<setting id="doku_streams_base_url" type="text" label="Domain (BASE_URL)" default="https://doku-streams.com" />
<setting id="doku_streams_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />