Compare commits

...

4 Commits

11 changed files with 913 additions and 312 deletions

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.51" provider-name="ViewIt">
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.56" provider-name="ViewIt">
<requires>
<import addon="xbmc.python" version="3.0.0" />
<import addon="script.module.requests" />

View File

@@ -16,11 +16,26 @@ import json
import os
import re
import sys
import threading
import xml.etree.ElementTree as ET
from pathlib import Path
from types import ModuleType
from urllib.parse import parse_qs, urlencode
def _ensure_windows_selector_policy() -> None:
"""Erzwingt unter Windows einen Selector-Loop (thread-kompatibel in Kodi)."""
if not sys.platform.startswith("win"):
return
try:
current = asyncio.get_event_loop_policy()
if current.__class__.__name__ == "WindowsSelectorEventLoopPolicy":
return
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except Exception:
# Fallback: Wenn die Policy nicht verfügbar ist, arbeitet der Code mit Default-Policy weiter.
return
try: # pragma: no cover - Kodi runtime
import xbmc # type: ignore[import-not-found]
import xbmcaddon # type: ignore[import-not-found]
@@ -402,6 +417,81 @@ def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return default
def _get_setting_int(setting_id: str, *, default: int = 0) -> int:
if xbmcaddon is None:
return default
addon = _get_addon()
if addon is None:
return default
getter = getattr(addon, "getSettingInt", None)
if callable(getter):
raw_getter = getattr(addon, "getSetting", None)
if callable(raw_getter):
try:
raw = str(raw_getter(setting_id) or "").strip()
except TypeError:
raw = ""
if raw == "":
return default
try:
return int(getter(setting_id))
except TypeError:
return default
getter = getattr(addon, "getSetting", None)
if callable(getter):
try:
raw = str(getter(setting_id) or "").strip()
except TypeError:
return default
if raw == "":
return default
try:
return int(raw)
except ValueError:
return default
return default
METADATA_MODE_AUTO = 0
METADATA_MODE_SOURCE = 1
METADATA_MODE_TMDB = 2
METADATA_MODE_MIX = 3
def _metadata_setting_id(plugin_name: str) -> str:
safe = re.sub(r"[^a-z0-9]+", "_", (plugin_name or "").strip().casefold()).strip("_")
return f"{safe}_metadata_source" if safe else "metadata_source"
def _plugin_supports_metadata(plugin: BasisPlugin) -> bool:
try:
return plugin.__class__.metadata_for is not BasisPlugin.metadata_for
except Exception:
return False
def _metadata_policy(
plugin_name: str,
plugin: BasisPlugin,
*,
allow_tmdb: bool,
) -> tuple[bool, bool, bool]:
mode = _get_setting_int(_metadata_setting_id(plugin_name), default=METADATA_MODE_AUTO)
supports_source = _plugin_supports_metadata(plugin)
if mode == METADATA_MODE_SOURCE:
return supports_source, False, True
if mode == METADATA_MODE_TMDB:
return False, allow_tmdb, False
if mode == METADATA_MODE_MIX:
return supports_source, allow_tmdb, True
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
return supports_source, allow_tmdb, prefer_source
def _tmdb_list_enabled() -> bool:
return _tmdb_enabled() and _get_setting_bool("tmdb_genre_metadata", default=False)
def _set_setting_string(setting_id: str, value: str) -> None:
if xbmcaddon is None:
return
@@ -1120,18 +1210,28 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
try:
with _progress_dialog("Suche läuft", f"{plugin_name} (1/1) starte…") as progress:
canceled = progress(5, f"{plugin_name} (1/1) Suche…")
results = _run_async(plugin.search_titles(query))
search_coro = plugin.search_titles(query)
try:
results = _run_async(search_coro)
except Exception:
if inspect.iscoroutine(search_coro):
try:
search_coro.close()
except Exception:
pass
raise
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
results.sort(key=lambda value: value.casefold())
plugin_meta = _collect_plugin_metadata(plugin, results)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, results) if use_source else {}
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = list(results) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
@@ -1271,17 +1371,34 @@ def _discover_plugins() -> dict[str, BasisPlugin]:
def _run_async(coro):
"""Fuehrt eine Coroutine aus, auch wenn Kodi bereits einen Event-Loop hat."""
_ensure_windows_selector_policy()
def _run_with_asyncio_run():
return asyncio.run(coro)
try:
loop = asyncio.get_event_loop()
running_loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
temp_loop = asyncio.new_event_loop()
try:
return temp_loop.run_until_complete(coro)
finally:
temp_loop.close()
return asyncio.run(coro)
running_loop = None
if running_loop and running_loop.is_running():
result_box: dict[str, object] = {}
error_box: dict[str, BaseException] = {}
def _worker() -> None:
try:
result_box["value"] = _run_with_asyncio_run()
except BaseException as exc: # pragma: no cover - defensive
error_box["error"] = exc
worker = threading.Thread(target=_worker, name="viewit-async-runner")
worker.start()
worker.join()
if "error" in error_box:
raise error_box["error"]
return result_box.get("value")
return _run_with_asyncio_run()
def _series_url_params(plugin: BasisPlugin, title: str) -> dict[str, str]:
@@ -1327,21 +1444,28 @@ def _show_search_results(query: str) -> None:
canceled = progress(range_start, f"{plugin_name} ({plugin_index}/{total_plugins}) Suche…")
if canceled:
break
search_coro = plugin.search_titles(query)
try:
results = _run_async(plugin.search_titles(query))
results = _run_async(search_coro)
except Exception as exc:
if inspect.iscoroutine(search_coro):
try:
search_coro.close()
except Exception:
pass
_log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING)
continue
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
_log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG)
plugin_meta = _collect_plugin_metadata(plugin, results)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, results) if use_source else {}
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = list(results) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
@@ -1485,11 +1609,14 @@ def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None:
except Exception:
pass
use_source, show_tmdb, _prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
title_info_labels: dict[str, str] | None = None
title_art: dict[str, str] | None = None
title_cast: list[TmdbCastMember] | None = None
meta_getter = getattr(plugin, "metadata_for", None)
if callable(meta_getter):
if use_source and callable(meta_getter):
try:
with _busy_dialog():
meta_labels, meta_art, meta_cast = meta_getter(title)
@@ -1516,8 +1643,9 @@ def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None:
xbmcplugin.setPluginCategory(handle, f"{title} ({count} {suffix})")
_set_content(handle, "seasons")
# Staffel-Metadaten (Plot/Poster) optional via TMDB.
_tmdb_labels_and_art(title)
api_key = _get_setting_string("tmdb_api_key").strip()
if show_tmdb:
_tmdb_labels_and_art(title)
api_key = _get_setting_string("tmdb_api_key").strip() if show_tmdb else ""
language = _get_setting_string("tmdb_language").strip() or "de-DE"
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
@@ -1606,13 +1734,42 @@ def _show_episodes(plugin_name: str, title: str, season: str, series_url: str =
episodes = list(plugin.episodes_for(title, season))
if episodes:
show_info, show_art, show_cast = _tmdb_labels_and_art(title)
episode_url_getter = getattr(plugin, "episode_url_for", None)
supports_direct_episode_url = callable(getattr(plugin, "stream_link_for_url", None))
use_source, show_tmdb, _prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_enabled()
)
show_info: dict[str, str] = {}
show_art: dict[str, str] = {}
show_cast: list[TmdbCastMember] | None = None
if show_tmdb:
show_info, show_art, show_cast = _tmdb_labels_and_art(title)
elif use_source:
meta_getter = getattr(plugin, "metadata_for", None)
if callable(meta_getter):
try:
with _busy_dialog():
meta_labels, meta_art, meta_cast = meta_getter(title)
if isinstance(meta_labels, dict):
show_info = {str(k): str(v) for k, v in meta_labels.items() if v}
if isinstance(meta_art, dict):
show_art = {str(k): str(v) for k, v in meta_art.items() if v}
if isinstance(meta_cast, list):
show_cast = meta_cast # noqa: PGH003
except Exception:
pass
show_fanart = (show_art or {}).get("fanart") if isinstance(show_art, dict) else ""
show_poster = (show_art or {}).get("poster") if isinstance(show_art, dict) else ""
with _busy_dialog():
for episode in episodes:
info_labels, art = _tmdb_episode_labels_and_art(title=title, season_label=season, episode_label=episode)
episode_cast = _tmdb_episode_cast(title=title, season_label=season, episode_label=episode)
if show_tmdb:
info_labels, art = _tmdb_episode_labels_and_art(
title=title, season_label=season, episode_label=episode
)
episode_cast = _tmdb_episode_cast(title=title, season_label=season, episode_label=episode)
else:
info_labels, art, episode_cast = {}, {}, []
merged_info = dict(show_info or {})
merged_info.update(dict(info_labels or {}))
merged_art: dict[str, str] = {}
@@ -1642,11 +1799,25 @@ def _show_episodes(plugin_name: str, title: str, season: str, series_url: str =
merged_info = _apply_playstate_to_info(merged_info, _get_playstate(key))
display_label = episode
play_params = {
"plugin": plugin_name,
"title": title,
"season": season,
"episode": episode,
"series_url": series_url,
}
if supports_direct_episode_url and callable(episode_url_getter):
try:
episode_url = str(episode_url_getter(title, season, episode) or "").strip()
except Exception:
episode_url = ""
if episode_url:
play_params["url"] = episode_url
_add_directory_item(
handle,
display_label,
"play_episode",
{"plugin": plugin_name, "title": title, "season": season, "episode": episode},
play_params,
is_folder=False,
info_labels=merged_info,
art=merged_art,
@@ -1803,16 +1974,16 @@ def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = _tmdb_enabled()
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
@@ -1823,51 +1994,31 @@ def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
if show_tmdb:
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, {}, {}, None, meta)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info(info_labels, playstate),
art=art,
cast=cast,
)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
@@ -1939,16 +2090,16 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = show_tmdb and _tmdb_enabled()
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
@@ -2090,50 +2241,52 @@ def _show_alpha_titles_page(plugin_name: str, letter: str, page: int = 1) -> Non
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(titles)
for title in titles:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
playstate = _title_playstate(plugin_name, title)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
@@ -2200,42 +2353,48 @@ def _show_series_catalog(plugin_name: str, page: int = 1) -> None:
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(titles)
for title in titles:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, titles) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in titles:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
@@ -2429,16 +2588,16 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
end = start + page_size
page_items = titles[start:end]
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page_items:
plugin_meta = _collect_plugin_metadata(plugin, page_items)
show_tmdb = show_tmdb and _tmdb_enabled()
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items)
if show_tmdb and prefer_source:
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
meta = plugin_meta.get(title)
@@ -2450,7 +2609,7 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
@@ -2577,48 +2736,50 @@ def _show_new_titles(plugin_name: str, page: int = 1) -> None:
start = (page - 1) * page_size
end = start + page_size
page_items = titles[start:end]
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "movie")
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "movie")
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if callable(paging_getter) and callable(has_more_getter):
@@ -2738,7 +2899,9 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
return
xbmcplugin.setPluginCategory(handle, f"{genre} [{group_code}] ({page})")
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
if page > 1:
_add_directory_item(
handle,
@@ -2748,40 +2911,44 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
is_folder=True,
)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if callable(grouped_has_more):
try:
@@ -2827,43 +2994,49 @@ def _show_genre_series_group(plugin_name: str, genre: str, group_code: str, page
start = (page - 1) * page_size
end = start + page_size
page_items = filtered[start:end]
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
use_source, show_tmdb, prefer_source = _metadata_policy(
plugin_name, plugin, allow_tmdb=_tmdb_list_enabled()
)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
plugin_meta = _collect_plugin_metadata(plugin, page_items) if use_source else {}
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items) if show_tmdb else []
if show_tmdb and prefer_source and use_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
if total_pages > 1 and page < total_pages:
_add_directory_item(
@@ -3045,8 +3218,34 @@ def _play_episode(
season: str,
episode: str,
*,
episode_url: str = "",
series_url: str = "",
resolve_handle: int | None = None,
) -> None:
episode_url = (episode_url or "").strip()
if episode_url:
_play_episode_url(
plugin_name,
title=title,
season_number=_extract_first_int(season) or 0,
episode_number=_extract_first_int(episode) or 0,
episode_url=episode_url,
season_label_override=season,
episode_label_override=episode,
resolve_handle=resolve_handle,
)
return
series_url = (series_url or "").strip()
if series_url:
plugin_for_url = _discover_plugins().get(plugin_name)
remember_series_url = getattr(plugin_for_url, "remember_series_url", None) if plugin_for_url is not None else None
if callable(remember_series_url):
try:
remember_series_url(title, series_url)
except Exception:
pass
_log(f"Play anfordern: {plugin_name} / {title} / {season} / {episode}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
@@ -3123,10 +3322,14 @@ def _play_episode_url(
season_number: int,
episode_number: int,
episode_url: str,
season_label_override: str = "",
episode_label_override: str = "",
resolve_handle: int | None = None,
) -> None:
season_label = f"Staffel {season_number}" if season_number > 0 else ""
episode_label = f"Episode {episode_number}" if episode_number > 0 else ""
season_label = (season_label_override or "").strip() or (f"Staffel {season_number}" if season_number > 0 else "")
episode_label = (episode_label_override or "").strip() or (
f"Episode {episode_number}" if episode_number > 0 else ""
)
_log(f"Play (URL) anfordern: {plugin_name} / {title} / {season_label} / {episode_label} / {episode_url}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
@@ -3295,6 +3498,8 @@ def run() -> None:
params.get("title", ""),
params.get("season", ""),
params.get("episode", ""),
episode_url=params.get("url", ""),
series_url=params.get("series_url", ""),
resolve_handle=_get_handle(),
)
elif action == "play_movie":

View File

@@ -9,7 +9,7 @@ Zum Verwenden:
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias
from typing import TYPE_CHECKING, Any, List, Optional
try: # pragma: no cover - optional dependency
import requests
@@ -34,8 +34,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"

View File

@@ -13,7 +13,7 @@ import hashlib
import json
import re
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
try: # pragma: no cover - optional dependency
import requests
@@ -43,8 +43,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
SETTING_BASE_URL = "aniworld_base_url"
@@ -1213,6 +1213,18 @@ class AniworldPlugin(BasisPlugin):
_log_url(link, kind="FOUND")
return link
def episode_url_for(self, title: str, season: str, episode: str) -> str:
cache_key = (title, season)
cached = self._episode_label_cache.get(cache_key)
if cached:
info = cached.get(episode)
if info and info.url:
return info.url
episode_info = self._lookup_episode(title, season, episode)
if episode_info and episode_info.url:
return episode_info.url
return ""
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Hoster laden.")

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional
try: # pragma: no cover - optional dependency
import requests
@@ -27,8 +27,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"

View File

@@ -43,7 +43,7 @@ SETTING_DUMP_HTML = "dump_html_einschalten"
SETTING_SHOW_URL_INFO = "show_url_info_einschalten"
SETTING_LOG_ERRORS = "log_errors_einschalten"
DEFAULT_BASE_URL = ""
DEFAULT_BASE_URL = "https://einschalten.in"
DEFAULT_INDEX_PATH = "/"
DEFAULT_NEW_TITLES_PATH = "/movies/new"
DEFAULT_SEARCH_PATH = "/search"

View File

@@ -11,7 +11,7 @@ from dataclasses import dataclass
import re
from urllib.parse import quote, urlencode
from urllib.parse import urljoin
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
try: # pragma: no cover - optional dependency
import requests
@@ -33,8 +33,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"
@@ -820,11 +820,23 @@ class FilmpalastPlugin(BasisPlugin):
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
hosters = self._hosters_for_detail_url(detail_url)
return list(hosters.keys())
return self.available_hosters_for_url(detail_url)
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
return self.stream_link_for_url(detail_url)
def episode_url_for(self, title: str, season: str, episode: str) -> str:
detail_url = self._detail_url_for_selection(title, season, episode)
return (detail_url or "").strip()
def available_hosters_for_url(self, episode_url: str) -> List[str]:
detail_url = (episode_url or "").strip()
hosters = self._hosters_for_detail_url(detail_url)
return list(hosters.keys())
def stream_link_for_url(self, episode_url: str) -> Optional[str]:
detail_url = (episode_url or "").strip()
if not detail_url:
return None
hosters = self._hosters_for_detail_url(detail_url)

View File

@@ -17,7 +17,8 @@ import os
import re
import time
import unicodedata
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from urllib.parse import quote
try: # pragma: no cover - optional dependency
import requests
@@ -49,14 +50,15 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
SETTING_BASE_URL = "serienstream_base_url"
DEFAULT_BASE_URL = "https://s.to"
DEFAULT_PREFERRED_HOSTERS = ["voe"]
DEFAULT_TIMEOUT = 20
SEARCH_TIMEOUT = 8
ADDON_ID = "plugin.video.viewit"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
@@ -75,6 +77,9 @@ HEADERS = {
SESSION_CACHE_TTL_SECONDS = 300
SESSION_CACHE_PREFIX = "viewit.serienstream"
SESSION_CACHE_MAX_TITLE_URLS = 800
CATALOG_SEARCH_TTL_SECONDS = 600
CATALOG_SEARCH_CACHE_KEY = "catalog_index"
_CATALOG_INDEX_MEMORY: tuple[float, List["SeriesResult"]] = (0.0, [])
@dataclass
@@ -111,6 +116,57 @@ class SeasonInfo:
episodes: List[EpisodeInfo]
def _extract_series_metadata(soup: BeautifulSoupT) -> Tuple[Dict[str, str], Dict[str, str]]:
info: Dict[str, str] = {}
art: Dict[str, str] = {}
if not soup:
return info, art
title_tag = soup.select_one("h1")
title = (title_tag.get_text(" ", strip=True) if title_tag else "").strip()
if title:
info["title"] = title
description = ""
desc_tag = soup.select_one(".series-description .description-text")
if desc_tag:
description = (desc_tag.get_text(" ", strip=True) or "").strip()
if not description:
meta_desc = soup.select_one("meta[property='og:description'], meta[name='description']")
if meta_desc:
description = (meta_desc.get("content") or "").strip()
if description:
info["plot"] = description
poster = ""
poster_tag = soup.select_one(
".show-cover-mobile img[data-src], .show-cover-mobile img[src], .col-3 img[data-src], .col-3 img[src]"
)
if poster_tag:
poster = (poster_tag.get("data-src") or poster_tag.get("src") or "").strip()
if not poster:
for candidate in soup.select("img[data-src], img[src]"):
url = (candidate.get("data-src") or candidate.get("src") or "").strip()
if "/media/images/channel/" in url:
poster = url
break
if poster:
poster = _absolute_url(poster)
art["poster"] = poster
art["thumb"] = poster
fanart = ""
fanart_tag = soup.select_one("meta[property='og:image']")
if fanart_tag:
fanart = (fanart_tag.get("content") or "").strip()
if fanart:
fanart = _absolute_url(fanart)
art["fanart"] = fanart
art["landscape"] = fanart
return info, art
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
@@ -400,20 +456,222 @@ def _extract_genre_names_from_html(body: str) -> List[str]:
return names
def _strip_tags(value: str) -> str:
return re.sub(r"<[^>]+>", " ", value or "")
def _search_series_api(query: str) -> List[SeriesResult]:
query = (query or "").strip()
if not query:
return []
_ensure_requests()
sess = get_requests_session("serienstream", headers=HEADERS)
terms = [query]
if " " in query:
# Fallback: einzelne Tokens liefern in der API oft bessere Treffer.
terms.extend([token for token in query.split() if token])
seen_urls: set[str] = set()
for term in terms:
try:
response = sess.get(
f"{_get_base_url()}/api/search/suggest",
params={"term": term},
headers=HEADERS,
timeout=SEARCH_TIMEOUT,
)
response.raise_for_status()
except Exception:
continue
try:
payload = response.json()
except Exception:
continue
shows = payload.get("shows") if isinstance(payload, dict) else None
if not isinstance(shows, list):
continue
results: List[SeriesResult] = []
for item in shows:
if not isinstance(item, dict):
continue
title = (item.get("name") or "").strip()
href = (item.get("url") or "").strip()
if not title or not href:
continue
url_abs = _absolute_url(href)
if not url_abs or url_abs in seen_urls:
continue
if "/staffel-" in url_abs or "/episode-" in url_abs:
continue
seen_urls.add(url_abs)
results.append(SeriesResult(title=title, description="", url=url_abs))
if not results:
continue
filtered = [entry for entry in results if _matches_query(query, title=entry.title)]
if filtered:
return filtered
# Falls nur Token-Suche möglich war, zumindest die Ergebnisse liefern.
if term != query:
return results
return []
def _search_series_server(query: str) -> List[SeriesResult]:
if not query:
return []
api_results = _search_series_api(query)
if api_results:
return api_results
base = _get_base_url()
search_url = f"{base}/search?q={quote(query)}"
alt_url = f"{base}/suche?q={quote(query)}"
for url in (search_url, alt_url):
try:
body = _get_html_simple(url)
except Exception:
continue
if not body:
continue
soup = BeautifulSoup(body, "html.parser")
root = soup.select_one(".search-results-list")
if root is None:
continue
seen_urls: set[str] = set()
results: List[SeriesResult] = []
for card in root.select(".cover-card"):
anchor = card.select_one("a[href*='/serie/']")
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url_abs = _absolute_url(href)
if not url_abs or url_abs in seen_urls:
continue
if "/staffel-" in url_abs or "/episode-" in url_abs:
continue
title_tag = card.select_one(".show-title") or card.select_one("h3") or card.select_one("h4")
title = (title_tag.get_text(" ", strip=True) if title_tag else anchor.get_text(" ", strip=True)).strip()
if not title:
continue
seen_urls.add(url_abs)
results.append(SeriesResult(title=title, description="", url=url_abs))
if results:
return results
return []
def _extract_catalog_index_from_html(body: str) -> List[SeriesResult]:
items: List[SeriesResult] = []
if not body:
return items
seen_urls: set[str] = set()
item_re = re.compile(
r"<li[^>]*class=[\"'][^\"']*series-item[^\"']*[\"'][^>]*>(.*?)</li>",
re.IGNORECASE | re.DOTALL,
)
anchor_re = re.compile(r"<a[^>]+href=[\"']([^\"']+)[\"'][^>]*>(.*?)</a>", re.IGNORECASE | re.DOTALL)
data_search_re = re.compile(r"data-search=[\"']([^\"']*)[\"']", re.IGNORECASE)
for match in item_re.finditer(body):
block = match.group(0)
inner = match.group(1) or ""
anchor_match = anchor_re.search(inner)
if not anchor_match:
continue
href = (anchor_match.group(1) or "").strip()
url = _absolute_url(href)
if not url or "/serie/" not in url or "/staffel-" in url or "/episode-" in url:
continue
if url in seen_urls:
continue
seen_urls.add(url)
title_raw = anchor_match.group(2) or ""
title = unescape(re.sub(r"\s+", " ", _strip_tags(title_raw))).strip()
if not title:
continue
search_match = data_search_re.search(block)
description = (search_match.group(1) or "").strip() if search_match else ""
items.append(SeriesResult(title=title, description=description, url=url))
return items
def _catalog_index_from_soup(soup: BeautifulSoupT) -> List[SeriesResult]:
items: List[SeriesResult] = []
if not soup:
return items
seen_urls: set[str] = set()
for item in soup.select("li.series-item"):
anchor = item.find("a", href=True)
if not anchor:
continue
href = (anchor.get("href") or "").strip()
url = _absolute_url(href)
if not url or "/serie/" not in url or "/staffel-" in url or "/episode-" in url:
continue
if url in seen_urls:
continue
seen_urls.add(url)
title = (anchor.get_text(" ", strip=True) or "").strip()
if not title:
continue
description = (item.get("data-search") or "").strip()
items.append(SeriesResult(title=title, description=description, url=url))
return items
def _load_catalog_index_from_cache() -> Optional[List[SeriesResult]]:
global _CATALOG_INDEX_MEMORY
expires_at, cached = _CATALOG_INDEX_MEMORY
if cached and expires_at > time.time():
return list(cached)
raw = _session_cache_get(CATALOG_SEARCH_CACHE_KEY)
if not isinstance(raw, list):
return None
items: List[SeriesResult] = []
for entry in raw:
if not isinstance(entry, list) or len(entry) < 2:
continue
title = str(entry[0] or "").strip()
url = str(entry[1] or "").strip()
description = str(entry[2] or "") if len(entry) > 2 else ""
if title and url:
items.append(SeriesResult(title=title, description=description, url=url))
if items:
_CATALOG_INDEX_MEMORY = (time.time() + CATALOG_SEARCH_TTL_SECONDS, list(items))
return items or None
def _store_catalog_index_in_cache(items: List[SeriesResult]) -> None:
global _CATALOG_INDEX_MEMORY
if not items:
return
_CATALOG_INDEX_MEMORY = (time.time() + CATALOG_SEARCH_TTL_SECONDS, list(items))
payload: List[List[str]] = []
for entry in items:
if not entry.title or not entry.url:
continue
payload.append([entry.title, entry.url, entry.description])
_session_cache_set(CATALOG_SEARCH_CACHE_KEY, payload, ttl_seconds=CATALOG_SEARCH_TTL_SECONDS)
def search_series(query: str) -> List[SeriesResult]:
"""Sucht Serien im (/serien)-Katalog (Genre-liste) nach Titel/Alt-Titel."""
"""Sucht Serien im (/serien)-Katalog nach Titel. Nutzt Cache + Ein-Pass-Filter."""
_ensure_requests()
if not _normalize_search_text(query):
return []
# Direkter Abruf wie in fetch_serien.py.
server_results = _search_series_server(query)
if server_results:
return [entry for entry in server_results if entry.title and _matches_query(query, title=entry.title)]
cached = _load_catalog_index_from_cache()
if cached is not None:
return [entry for entry in cached if entry.title and _matches_query(query, title=entry.title)]
catalog_url = f"{_get_base_url()}/serien?by=genre"
soup = _get_soup_simple(catalog_url)
results: List[SeriesResult] = []
for series in parse_series_catalog(soup).values():
for entry in series:
if entry.title and _matches_query(query, title=entry.title):
results.append(entry)
return results
body = _get_html_simple(catalog_url)
items = _extract_catalog_index_from_html(body)
if not items:
soup = BeautifulSoup(body, "html.parser")
items = _catalog_index_from_soup(soup)
if items:
_store_catalog_index_in_cache(items)
return [entry for entry in items if entry.title and _matches_query(query, title=entry.title)]
def parse_series_catalog(soup: BeautifulSoupT) -> Dict[str, List[SeriesResult]]:
@@ -805,6 +1063,7 @@ class SerienstreamPlugin(BasisPlugin):
self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {}
self._latest_cache: Dict[int, List[LatestEpisode]] = {}
self._latest_hoster_cache: Dict[str, List[str]] = {}
self._series_metadata_cache: Dict[str, Tuple[Dict[str, str], Dict[str, str]]] = {}
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
@@ -851,12 +1110,30 @@ class SerienstreamPlugin(BasisPlugin):
cache_key = title.casefold()
if self._title_url_cache.get(cache_key) != url:
self._title_url_cache[cache_key] = url
self._save_title_url_cache()
self._save_title_url_cache()
if url:
return
current = self._series_results.get(title)
if current is None:
self._series_results[title] = SeriesResult(title=title, description=description, url="")
@staticmethod
def _metadata_cache_key(title: str) -> str:
return (title or "").strip().casefold()
def _series_for_title(self, title: str) -> Optional[SeriesResult]:
direct = self._series_results.get(title)
if direct and direct.url:
return direct
lookup_key = (title or "").strip().casefold()
for item in self._series_results.values():
if item.title.casefold().strip() == lookup_key and item.url:
return item
cached_url = self._title_url_cache.get(lookup_key, "")
if cached_url:
return SeriesResult(title=title, description="", url=cached_url)
return None
@staticmethod
def _season_links_cache_name(series_url: str) -> str:
digest = hashlib.sha1((series_url or "").encode("utf-8")).hexdigest()[:20]
@@ -1274,7 +1551,28 @@ class SerienstreamPlugin(BasisPlugin):
self._season_links_cache[title] = list(session_links)
return list(session_links)
try:
seasons = scrape_series_detail(series.url, load_episodes=False)
series_soup = _get_soup(series.url, session=get_requests_session("serienstream", headers=HEADERS))
info_labels, art = _extract_series_metadata(series_soup)
if series.description and "plot" not in info_labels:
info_labels["plot"] = series.description
cache_key = self._metadata_cache_key(title)
if info_labels or art:
self._series_metadata_cache[cache_key] = (info_labels, art)
base_series_url = _series_root_url(_extract_canonical_url(series_soup, series.url))
season_links = _extract_season_links(series_soup)
season_count = _extract_number_of_seasons(series_soup)
if season_count and (not season_links or len(season_links) < season_count):
existing = {number for number, _ in season_links}
for number in range(1, season_count + 1):
if number in existing:
continue
season_url = f"{base_series_url}/staffel-{number}"
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
seasons = [SeasonInfo(number=number, url=url, episodes=[]) for number, url in season_links]
seasons.sort(key=lambda s: s.number)
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Serienstream-Staffeln konnten nicht geladen werden: {exc}") from exc
self._season_links_cache[title] = list(seasons)
@@ -1288,6 +1586,41 @@ class SerienstreamPlugin(BasisPlugin):
return
self._remember_series_result(title, series_url)
def metadata_for(self, title: str) -> Tuple[Dict[str, str], Dict[str, str], Optional[List[Any]]]:
title = (title or "").strip()
if not title or not self._requests_available:
return {}, {}, None
cache_key = self._metadata_cache_key(title)
cached = self._series_metadata_cache.get(cache_key)
if cached is not None:
info, art = cached
return dict(info), dict(art), None
series = self._series_for_title(title)
if series is None or not series.url:
info = {"title": title}
self._series_metadata_cache[cache_key] = (dict(info), {})
return info, {}, None
info: Dict[str, str] = {"title": title}
art: Dict[str, str] = {}
if series.description:
info["plot"] = series.description
try:
soup = _get_soup(series.url, session=get_requests_session("serienstream", headers=HEADERS))
parsed_info, parsed_art = _extract_series_metadata(soup)
if parsed_info:
info.update(parsed_info)
if parsed_art:
art.update(parsed_art)
except Exception:
pass
self._series_metadata_cache[cache_key] = (dict(info), dict(art))
return info, art, None
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
@@ -1443,6 +1776,18 @@ class SerienstreamPlugin(BasisPlugin):
except Exception as exc: # pragma: no cover - defensive logging
raise RuntimeError(f"Stream-Link konnte nicht geladen werden: {exc}") from exc
def episode_url_for(self, title: str, season: str, episode: str) -> str:
cache_key = (title, season)
cached = self._episode_label_cache.get(cache_key)
if cached:
info = cached.get(episode)
if info and info.url:
return info.url
episode_info = self._lookup_episode(title, season, episode)
if episode_info and episode_info.url:
return episode_info.url
return ""
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("SerienstreamPlugin kann ohne requests/bs4 keine Hoster laden.")

View File

@@ -19,7 +19,7 @@ import hashlib
import os
import re
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlencode, urljoin
try: # pragma: no cover - optional dependency
@@ -51,8 +51,8 @@ if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
RequestsSession = Any
BeautifulSoupT = Any
ADDON_ID = "plugin.video.viewit"

View File

@@ -31,22 +31,28 @@
</category>
<category label="TopStream">
<setting id="topstream_base_url" type="text" label="Domain (BASE_URL)" default="https://topstreamfilm.live" />
<setting id="topstreamfilm_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
<setting id="topstream_genre_max_pages" type="number" label="Genres: max. Seiten laden (Pagination)" default="20" />
</category>
<category label="SerienStream">
<setting id="serienstream_base_url" type="text" label="Domain (BASE_URL)" default="https://s.to" />
<setting id="serienstream_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="AniWorld">
<setting id="aniworld_base_url" type="text" label="Domain (BASE_URL)" default="https://aniworld.to" />
<setting id="aniworld_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="Einschalten">
<setting id="einschalten_base_url" type="text" label="Domain (BASE_URL)" default="https://einschalten.in" />
<setting id="einschalten_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" />
<setting id="filmpalast_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="Doku-Streams">
<setting id="doku_streams_base_url" type="text" label="Domain (BASE_URL)" default="https://doku-streams.com" />
<setting id="doku_streams_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Auto|Quelle|TMDB|Mix" />
</category>
<category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />

View File

@@ -31,7 +31,28 @@
"Matrix Revolutions",
"Matrix Fighters"
],
"Einschalten::new_titles_page::1": [],
"Einschalten::new_titles_page::1": [
"Miracle: Das Eishockeywunder von 1980",
"No Escape - Grizzly Night",
"Kidnapped: Der Fall Elizabeth Smart",
"The Internship",
"The Rip",
"Die Toten vom Bodensee Schicksalsrad",
"People We Meet on Vacation",
"Anaconda",
"Even If This Love Disappears Tonight",
"Die Stunde der Mutigen",
"10DANCE",
"SpongeBob Schwammkopf: Piraten Ahoi!",
"Ella McCay",
"Merv",
"Elmo and Mark Rober's Merry Giftmas",
"Als mein Vater Weihnachten rettete 2",
"Die Fraggles: Der erste Schnee",
"Gregs Tagebuch 3: Jetzt reicht's!",
"Not Without Hope",
"Five Nights at Freddy's 2"
],
"Filmpalast::search_titles::trek": [
"Star Trek",
"Star Trek - Der Film",