Compare commits

...

5 Commits

15 changed files with 941 additions and 68 deletions

BIN
.coverage Normal file

Binary file not shown.

7
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@@ -20,5 +20,10 @@ ViewIT ist ein KodiAddon zum Durchsuchen und Abspielen von Inhalten der unter
- Plugins: `addon/plugins/*_plugin.py`
- Einstellungen: `addon/resources/settings.xml`
## Tests mit Abdeckung
- Dev-Abhängigkeiten installieren: `./.venv/bin/pip install -r requirements-dev.txt`
- Tests + Coverage starten: `./.venv/bin/pytest`
- Optional (XML-Report): `./.venv/bin/pytest --cov-report=xml`
## Dokumentation
Siehe `docs/`.

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.49" provider-name="ViewIt">
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.50" provider-name="ViewIt">
<requires>
<import addon="xbmc.python" version="3.0.0" />
<import addon="script.module.requests" />

View File

@@ -147,6 +147,45 @@ def _busy_dialog():
_busy_close()
@contextmanager
def _progress_dialog(heading: str, message: str = ""):
"""Zeigt einen Fortschrittsdialog in Kodi und liefert eine Update-Funktion."""
dialog = None
try: # pragma: no cover - Kodi runtime
if xbmcgui is not None and hasattr(xbmcgui, "DialogProgress"):
dialog = xbmcgui.DialogProgress()
dialog.create(heading, message)
except Exception:
dialog = None
def _update(percent: int, text: str = "") -> bool:
if dialog is None:
return False
percent = max(0, min(100, int(percent)))
try: # Kodi Matrix/Nexus
dialog.update(percent, text)
except TypeError:
try: # Kodi Leia fallback
dialog.update(percent, text, "", "")
except Exception:
pass
except Exception:
pass
try:
return bool(dialog.iscanceled())
except Exception:
return False
try:
yield _update
finally:
if dialog is not None:
try:
dialog.close()
except Exception:
pass
def _get_handle() -> int:
return int(sys.argv[1]) if len(sys.argv) > 1 else -1
@@ -919,41 +958,69 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
_set_content(handle, "movies" if plugin_name.casefold() == "einschalten" else "tvshows")
_log(f"Suche nach Titeln (Plugin={plugin_name}): {query}")
list_items: list[dict[str, object]] = []
canceled = False
try:
results = _run_async(plugin.search_titles(query))
with _progress_dialog("Suche läuft", f"{plugin_name} (1/1) starte…") as progress:
canceled = progress(5, f"{plugin_name} (1/1) Suche…")
results = _run_async(plugin.search_titles(query))
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
results.sort(key=lambda value: value.casefold())
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results and not canceled:
canceled = progress(35, f"{plugin_name} (1/1) Metadaten…")
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results))
total_results = max(1, len(results))
for index, title in enumerate(results, start=1):
if canceled:
break
if index == 1 or index == total_results or (index % 10 == 0):
pct = 35 + int((index / float(total_results)) * 60)
canceled = progress(pct, f"{plugin_name} (1/1) aufbereiten {index}/{total_results}")
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
merged_info = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False))
extra_params = _series_url_params(plugin, title)
list_items.append(
{
"label": display_label,
"action": "play_movie" if direct_play else "seasons",
"params": {"plugin": plugin_name, "title": title, **extra_params},
"is_folder": (not direct_play),
"info_labels": merged_info,
"art": art,
"cast": cast,
}
)
except Exception as exc:
_log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Suche", "Suche fehlgeschlagen.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
if canceled and not list_items:
xbmcgui.Dialog().notification("Suche", "Suche abgebrochen.", xbmcgui.NOTIFICATION_INFO, 2500)
xbmcplugin.endOfDirectory(handle)
return
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
results.sort(key=lambda value: value.casefold())
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results))
for title in results:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
merged_info = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False))
extra_params = _series_url_params(plugin, title)
for item in list_items:
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **extra_params},
is_folder=not direct_play,
info_labels=merged_info,
art=art,
cast=cast,
str(item["label"]),
str(item["action"]),
dict(item["params"]),
is_folder=bool(item["is_folder"]),
info_labels=item["info_labels"],
art=item["art"],
cast=item["cast"],
)
xbmcplugin.endOfDirectory(handle)
@@ -1058,42 +1125,87 @@ def _show_search_results(query: str) -> None:
xbmcgui.Dialog().notification("Suche", "Keine Plugins gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
for plugin_name, plugin in plugins.items():
try:
results = _run_async(plugin.search_titles(query))
except Exception as exc:
_log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING)
continue
_log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results:
with _busy_dialog():
list_items: list[dict[str, object]] = []
canceled = False
plugin_entries = list(plugins.items())
total_plugins = max(1, len(plugin_entries))
with _progress_dialog("Suche läuft", "Suche gestartet…") as progress:
for plugin_index, (plugin_name, plugin) in enumerate(plugin_entries, start=1):
range_start = int(((plugin_index - 1) / float(total_plugins)) * 100)
range_end = int((plugin_index / float(total_plugins)) * 100)
canceled = progress(range_start, f"{plugin_name} ({plugin_index}/{total_plugins}) Suche…")
if canceled:
break
try:
results = _run_async(plugin.search_titles(query))
except Exception as exc:
_log(f"Suche fehlgeschlagen ({plugin_name}): {exc}", xbmc.LOGWARNING)
continue
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
_log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results:
canceled = progress(
range_start + int((range_end - range_start) * 0.35),
f"{plugin_name} ({plugin_index}/{total_plugins}) Metadaten…",
)
if canceled:
break
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results))
for title in results:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
merged_info = _apply_playstate_to_info(dict(info_labels), playstate)
label = _label_with_duration(title, info_labels)
label = _label_with_playstate(label, playstate)
label = f"{label} [{plugin_name}]"
direct_play = bool(
plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False)
)
extra_params = _series_url_params(plugin, title)
_add_directory_item(
handle,
label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **extra_params},
is_folder=not direct_play,
info_labels=merged_info,
art=art,
cast=cast,
)
total_results = max(1, len(results))
for title_index, title in enumerate(results, start=1):
if title_index == 1 or title_index == total_results or (title_index % 10 == 0):
canceled = progress(
range_start + int((range_end - range_start) * (0.35 + 0.65 * (title_index / float(total_results)))),
f"{plugin_name} ({plugin_index}/{total_plugins}) aufbereiten {title_index}/{total_results}",
)
if canceled:
break
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
merged_info = _apply_playstate_to_info(dict(info_labels), playstate)
label = _label_with_duration(title, info_labels)
label = _label_with_playstate(label, playstate)
label = f"{label} [{plugin_name}]"
direct_play = bool(
plugin_name.casefold() == "einschalten" and _get_setting_bool("einschalten_enable_playback", default=False)
)
extra_params = _series_url_params(plugin, title)
list_items.append(
{
"label": label,
"action": "play_movie" if direct_play else "seasons",
"params": {"plugin": plugin_name, "title": title, **extra_params},
"is_folder": (not direct_play),
"info_labels": merged_info,
"art": art,
"cast": cast,
}
)
if canceled:
break
if not canceled:
progress(100, "Suche abgeschlossen")
if canceled and not list_items:
xbmcgui.Dialog().notification("Suche", "Suche abgebrochen.", xbmcgui.NOTIFICATION_INFO, 2500)
xbmcplugin.endOfDirectory(handle)
return
for item in list_items:
_add_directory_item(
handle,
str(item["label"]),
str(item["action"]),
dict(item["params"]),
is_folder=bool(item["is_folder"]),
info_labels=item["info_labels"],
art=item["art"],
cast=item["cast"],
)
xbmcplugin.endOfDirectory(handle)
@@ -1123,11 +1235,14 @@ def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None:
info_labels: dict[str, object] = {"title": title, "mediatype": "movie"}
info_labels = _apply_playstate_to_info(info_labels, playstate)
display_label = _label_with_playstate(title, playstate)
movie_params = {"plugin": plugin_name, "title": title}
if series_url:
movie_params["series_url"] = series_url
_add_directory_item(
handle,
display_label,
"play_movie",
{"plugin": plugin_name, "title": title},
movie_params,
is_folder=False,
info_labels=info_labels,
)
@@ -1146,11 +1261,16 @@ def _show_seasons(plugin_name: str, title: str, series_url: str = "") -> None:
info_labels: dict[str, object] = {"title": title, "mediatype": "movie"}
info_labels = _apply_playstate_to_info(info_labels, playstate)
display_label = _label_with_playstate(title, playstate)
movie_params = {"plugin": plugin_name, "title": title}
if series_url:
movie_params["series_url"] = series_url
else:
movie_params.update(_series_url_params(plugin, title))
_add_directory_item(
handle,
display_label,
"play_movie",
{"plugin": plugin_name, "title": title},
movie_params,
is_folder=False,
info_labels=info_labels,
)
@@ -2499,6 +2619,15 @@ def run() -> None:
elif action == "play_movie":
plugin_name = params.get("plugin", "")
title = params.get("title", "")
series_url = params.get("series_url", "")
if series_url:
plugin = _discover_plugins().get(plugin_name)
remember_series_url = getattr(plugin, "remember_series_url", None) if plugin is not None else None
if callable(remember_series_url):
try:
remember_series_url(title, series_url)
except Exception:
pass
# Einschalten liefert Filme (keine Staffeln/Episoden). Für Playback nutzen wir:
# <Titel> -> Stream -> <Titel>.
if (plugin_name or "").casefold() == "einschalten":

View File

@@ -0,0 +1,668 @@
"""Filmpalast Integration (movie-style provider).
Hinweis:
- Der Parser ist bewusst defensiv und arbeitet mit mehreren Fallback-Selektoren,
da Filmpalast-Layouts je Domain variieren koennen.
"""
from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote, urlencode
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "filmpalast_base_url"
DEFAULT_BASE_URL = "https://filmpalast.to"
DEFAULT_TIMEOUT = 20
DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"]
SERIES_HINT_PREFIX = "series://filmpalast/"
SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE)
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_filmpalast"
SETTING_DUMP_HTML = "dump_html_filmpalast"
SETTING_SHOW_URL_INFO = "show_url_info_filmpalast"
SETTING_LOG_ERRORS = "log_errors_filmpalast"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass(frozen=True)
class SearchHit:
title: str
url: str
@dataclass(frozen=True)
class EpisodeEntry:
season: int
episode: int
suffix: str
url: str
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _absolute_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if url.startswith("http://") or url.startswith("https://"):
return url
if url.startswith("//"):
return f"https:{url}"
if url.startswith("/"):
return f"{_get_base_url()}{url}"
return f"{_get_base_url()}/{url.lstrip('/')}"
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _is_probably_content_url(url: str) -> bool:
lower = (url or "").casefold()
if not lower:
return False
block_markers = (
"/genre/",
"/kategorie/",
"/category/",
"/tag/",
"/login",
"/register",
"/kontakt",
"/impressum",
"/datenschutz",
"/dmca",
"/agb",
"javascript:",
"#",
)
if any(marker in lower for marker in block_markers):
return False
allow_markers = ("/stream/", "/film/", "/movie/", "/serien/", "/serie/", "/title/")
return any(marker in lower for marker in allow_markers)
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="filmpalast_urls.log",
url=url,
kind=kind,
)
def _log_visit(url: str) -> None:
_log_url_event(url, kind="VISIT")
notify_url(
ADDON_ID,
heading="Filmpalast",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="filmpalast_response",
)
def _log_error_message(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="filmpalast_errors.log",
message=message,
)
def _is_series_hint_url(value: str) -> bool:
return (value or "").startswith(SERIES_HINT_PREFIX)
def _series_hint_value(title: str) -> str:
safe_title = quote((title or "").strip(), safe="")
return f"{SERIES_HINT_PREFIX}{safe_title}" if safe_title else SERIES_HINT_PREFIX
def _extract_number(value: str) -> Optional[int]:
match = re.search(r"(\d+)", value or "")
if not match:
return None
try:
return int(match.group(1))
except Exception:
return None
def _strip_series_alias(title: str) -> str:
return re.sub(r"\s*\(serie\)\s*$", "", title or "", flags=re.IGNORECASE).strip()
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
_log_visit(url)
sess = session or get_requests_session("filmpalast", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error_message(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
return BeautifulSoup(response.text, "html.parser")
class FilmpalastPlugin(BasisPlugin):
name = "Filmpalast"
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {}
self._hoster_cache: Dict[str, Dict[str, str]] = {}
self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"FilmpalastPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
def _lookup_title_url(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._title_to_url.get(title)
if direct:
return direct
wanted = title.casefold()
for key, value in self._title_to_url.items():
if key.casefold() == wanted and value:
return value
return ""
def _series_key_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
if title in self._series_entries:
return title
wanted = title.casefold()
for key in self._series_entries.keys():
if key.casefold() == wanted:
return key
return ""
def _has_series_entries(self, title: str) -> bool:
return bool(self._series_key_for_title(title))
def _episode_entry_from_hit(self, hit: SearchHit) -> Optional[Tuple[str, EpisodeEntry]]:
title = (hit.title or "").strip()
if not title:
return None
marker = SEASON_EPISODE_RE.search(title)
if not marker:
return None
try:
season_number = int(marker.group(1))
episode_number = int(marker.group(2))
except Exception:
return None
series_title = re.sub(r"\s+", " ", title[: marker.start()] or "").strip(" -|:;,_")
if not series_title:
return None
suffix = re.sub(r"\s+", " ", title[marker.end() :] or "").strip(" -|:;,_")
entry = EpisodeEntry(season=season_number, episode=episode_number, suffix=suffix, url=hit.url)
return (series_title, entry)
def _add_series_entry(self, series_title: str, entry: EpisodeEntry) -> None:
if not series_title or not entry.url:
return
seasons = self._series_entries.setdefault(series_title, {})
episodes = seasons.setdefault(entry.season, {})
if entry.episode not in episodes:
episodes[entry.episode] = entry
def _ensure_series_entries_for_title(self, title: str) -> str:
series_key = self._series_key_for_title(title)
if series_key:
return series_key
original_title = (title or "").strip()
lookup_title = _strip_series_alias(original_title)
if not lookup_title:
return ""
if not self._requests_available:
return ""
wanted = _normalize_search_text(lookup_title)
hits = self._search_hits(lookup_title)
for hit in hits:
parsed = self._episode_entry_from_hit(hit)
if not parsed:
continue
series_title, entry = parsed
if wanted and _normalize_search_text(series_title) != wanted:
continue
self._add_series_entry(series_title, entry)
self._title_to_url.setdefault(series_title, _series_hint_value(series_title))
resolved = self._series_key_for_title(original_title) or self._series_key_for_title(lookup_title)
if resolved and original_title and original_title != resolved:
self._series_entries[original_title] = self._series_entries[resolved]
self._title_to_url.setdefault(original_title, _series_hint_value(resolved))
return original_title
return resolved
def _detail_url_for_selection(self, title: str, season: str, episode: str) -> str:
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
if series_key:
season_number = _extract_number(season)
episode_number = _extract_number(episode)
if season_number is None or episode_number is None:
return ""
entry = self._series_entries.get(series_key, {}).get(season_number, {}).get(episode_number)
return entry.url if entry else ""
return self._ensure_title_url(title)
def _search_hits(self, query: str) -> List[SearchHit]:
query = (query or "").strip()
if not query:
return []
if not self._requests_available or requests is None:
return []
session = get_requests_session("filmpalast", headers=HEADERS)
search_requests = [(_absolute_url(f"/search/title/{quote(query)}"), None)]
hits: List[SearchHit] = []
seen_titles: set[str] = set()
seen_urls: set[str] = set()
for base_url, params in search_requests:
try:
request_url = base_url if not params else f"{base_url}?{urlencode(params)}"
_log_url_event(request_url, kind="GET")
_log_visit(request_url)
response = session.get(base_url, params=params, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
if response.url and response.url != request_url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(request_url, response.text)
soup = BeautifulSoup(response.text, "html.parser")
except Exception as exc:
_log_error_message(f"search request failed ({base_url}): {exc}")
continue
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
if not anchors:
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if not _is_probably_content_url(url):
continue
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
title = (title or "").strip()
if not title:
continue
if title.casefold() in {"details/play", "play", "details"}:
continue
if not _matches_query(query, title=title):
continue
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
hits.append(SearchHit(title=title, url=url))
if hits:
break
return hits
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
self._title_to_url = {}
self._series_entries = {}
self._hoster_cache.clear()
movie_titles: List[str] = []
series_titles_seen: set[str] = set()
for hit in hits:
parsed = self._episode_entry_from_hit(hit)
if parsed:
series_title, entry = parsed
self._add_series_entry(series_title, entry)
if series_title.casefold() not in series_titles_seen:
self._title_to_url[series_title] = _series_hint_value(series_title)
series_titles_seen.add(series_title.casefold())
continue
title = (hit.title or "").strip()
if not title:
continue
movie_titles.append(title)
self._title_to_url[title] = hit.url
titles: List[str] = list(movie_titles)
movie_keys = {entry.casefold() for entry in movie_titles}
for series_title in sorted(self._series_entries.keys(), key=lambda value: value.casefold()):
if series_title.casefold() in movie_keys:
alias = f"{series_title} (Serie)"
self._title_to_url[alias] = self._title_to_url.get(series_title, _series_hint_value(series_title))
self._series_entries[alias] = self._series_entries[series_title]
titles.append(alias)
else:
titles.append(series_title)
titles.sort(key=lambda value: value.casefold())
return titles
def _ensure_title_url(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._lookup_title_url(title)
if direct and _is_series_hint_url(direct):
return ""
if direct:
self._title_to_url[title] = direct
return direct
if self._has_series_entries(title) or self._ensure_series_entries_for_title(title):
self._title_to_url[title] = _series_hint_value(title)
return ""
wanted = title.casefold()
hits = self._search_hits(title)
for hit in hits:
if self._episode_entry_from_hit(hit):
continue
if hit.title.casefold() == wanted and hit.url:
self._title_to_url[title] = hit.url
return hit.url
return ""
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._title_to_url[title] = series_url
self._hoster_cache.clear()
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._lookup_title_url(title)
if direct:
return direct
series_key = self._series_key_for_title(title)
if series_key:
return _series_hint_value(series_key)
return ""
def is_movie(self, title: str) -> bool:
title = (title or "").strip()
if not title:
return False
direct = self._lookup_title_url(title)
if direct:
return not _is_series_hint_url(direct)
if SEASON_EPISODE_RE.search(title):
return False
if self._has_series_entries(title):
return False
if self._ensure_series_entries_for_title(title):
return False
return True
@staticmethod
def _normalize_hoster_name(name: str) -> str:
name = (name or "").strip()
if not name:
return ""
name = re.sub(r"\s+", " ", name)
return name
def _extract_hoster_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
hosters: Dict[str, str] = {}
if not soup:
return hosters
# Primäres Layout: jeder Hoster in eigener UL mit hostName + Play-Link.
for block in soup.select("ul.currentStreamLinks"):
host_name_node = block.select_one("li.hostBg .hostName")
host_name = self._normalize_hoster_name(host_name_node.get_text(" ", strip=True) if host_name_node else "")
play_anchor = block.select_one("li.streamPlayBtn a[href], a.button.iconPlay[href]")
href = (play_anchor.get("href") if play_anchor else "") or ""
play_url = _absolute_url(href).strip()
if not play_url:
continue
if not host_name:
host_name = self._normalize_hoster_name(play_anchor.get_text(" ", strip=True) if play_anchor else "")
if not host_name:
host_name = "Unbekannt"
if host_name not in hosters:
hosters[host_name] = play_url
# Fallback: direkte Play-Buttons im Stream-Bereich.
if not hosters:
for anchor in soup.select("#grap-stream-list a.button.iconPlay[href], .streamLinksWrapper a.button.iconPlay[href]"):
href = (anchor.get("href") or "").strip()
play_url = _absolute_url(href).strip()
if not play_url:
continue
text_name = self._normalize_hoster_name(anchor.get_text(" ", strip=True))
host_name = text_name if text_name and text_name.casefold() not in {"play", "details play"} else "Unbekannt"
if host_name in hosters:
host_name = f"{host_name} #{len(hosters) + 1}"
hosters[host_name] = play_url
return hosters
def _hosters_for_detail_url(self, detail_url: str) -> Dict[str, str]:
detail_url = (detail_url or "").strip()
if not detail_url:
return {}
cached = self._hoster_cache.get(detail_url)
if cached is not None:
return dict(cached)
if not self._requests_available:
return {}
try:
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return {}
hosters = self._extract_hoster_links(soup)
for url in hosters.values():
_log_url_event(url, kind="PARSE")
self._hoster_cache[detail_url] = dict(hosters)
return dict(hosters)
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
if series_key:
seasons = sorted(self._series_entries.get(series_key, {}).keys())
return [f"Staffel {number}" for number in seasons]
detail_url = self._ensure_title_url(title)
return ["Film"] if detail_url else []
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
if series_key:
season_number = _extract_number(season)
if season_number is None:
return []
episodes = self._series_entries.get(series_key, {}).get(season_number, {})
labels: List[str] = []
for episode_number in sorted(episodes.keys()):
entry = episodes[episode_number]
label = f"Episode {episode_number}"
if entry.suffix:
label = f"{label} - {entry.suffix}"
labels.append(label)
return labels
return ["Stream"] if self._ensure_title_url(title) else []
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
hosters = self._hosters_for_detail_url(detail_url)
return list(hosters.keys())
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
if not detail_url:
return None
hosters = self._hosters_for_detail_url(detail_url)
if hosters:
for preferred in self._preferred_hosters:
preferred_key = (preferred or "").strip().casefold()
if not preferred_key:
continue
for host_name, host_url in hosters.items():
if preferred_key in host_name.casefold() or preferred_key in host_url.casefold():
_log_url_event(host_url, kind="FOUND")
return host_url
first = next(iter(hosters.values()))
_log_url_event(first, kind="FOUND")
return first
if not self._requests_available:
return detail_url
try:
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return detail_url
candidates: List[str] = []
for iframe in soup.select("iframe[src]"):
src = (iframe.get("src") or "").strip()
if src:
candidates.append(_absolute_url(src))
for anchor in soup.select("a[href]"):
href = (anchor.get("href") or "").strip()
if not href:
continue
lower = href.casefold()
if "watch" in lower or "stream" in lower or "player" in lower:
candidates.append(_absolute_url(href))
deduped: List[str] = []
seen: set[str] = set()
for candidate in candidates:
key = candidate.casefold()
if key in seen:
continue
seen.add(key)
deduped.append(candidate)
if deduped:
_log_url_event(deduped[0], kind="FOUND")
return deduped[0]
return detail_url
def set_preferred_hosters(self, hosters: List[str]) -> None:
normalized = [str(hoster).strip().lower() for hoster in hosters if str(hoster).strip()]
if normalized:
self._preferred_hosters = normalized
def reset_preferred_hosters(self) -> None:
self._preferred_hosters = list(self._default_preferred_hosters)
def resolve_stream_link(self, link: str) -> Optional[str]:
if not link:
return None
resolved = link
if self._requests_available:
try:
session = get_requests_session("filmpalast", headers=HEADERS)
response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
response.raise_for_status()
resolved = (response.url or link).strip() or link
except Exception:
resolved = link
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(resolved)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
if resolved:
_log_url_event(resolved, kind="FINAL")
return resolved
return None

View File

@@ -24,6 +24,10 @@
<setting id="dump_html_einschalten" type="bool" label="Einschalten: HTML-Dumps" default="false" />
<setting id="show_url_info_einschalten" type="bool" label="Einschalten: URL-Info anzeigen" default="false" />
<setting id="log_errors_einschalten" type="bool" label="Einschalten: Fehler loggen" default="false" />
<setting id="log_urls_filmpalast" type="bool" label="Filmpalast: URL-Logging" default="false" />
<setting id="dump_html_filmpalast" type="bool" label="Filmpalast: HTML-Dumps" default="false" />
<setting id="show_url_info_filmpalast" type="bool" label="Filmpalast: URL-Info anzeigen" default="false" />
<setting id="log_errors_filmpalast" type="bool" label="Filmpalast: Fehler loggen" default="false" />
</category>
<category label="TopStream">
<setting id="topstream_base_url" type="text" label="Domain (BASE_URL)" default="https://topstreamfilm.live" />
@@ -38,6 +42,9 @@
<category label="Einschalten">
<setting id="einschalten_base_url" type="text" label="Domain (BASE_URL)" default="https://einschalten.in" />
</category>
<category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" />
</category>
<category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />
<setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" />

View File

@@ -13,6 +13,23 @@ Jedes Plugin muss diese Methoden implementieren:
- `seasons_for(title: str) -> list[str]`
- `episodes_for(title: str, season: str) -> list[str]`
## Vertrag Plugin ↔ Hauptlogik (`default.py`)
Die Hauptlogik ruft Plugin-Methoden auf und verarbeitet ausschließlich deren Rückgaben.
Wesentliche Rückgaben an die Hauptlogik:
- `search_titles(...)` → Liste von Titel-Strings für die Trefferliste
- `seasons_for(...)` → Liste von Staffel-Labels
- `episodes_for(...)` → Liste von Episoden-Labels
- `stream_link_for(...)` → Hoster-/Player-Link (nicht zwingend finale Media-URL)
- `resolve_stream_link(...)` → finale/spielbare URL nach Redirect/Resolver
- Optional `available_hosters_for(...)` → auswählbare Hoster-Namen im Dialog
- Optional `series_url_for_title(...)` → stabile Detail-URL pro Titel für Folgeaufrufe
- Optional `remember_series_url(...)` → Übernahme einer bereits bekannten Detail-URL
Standard für Film-Provider (ohne echte Staffeln):
- `seasons_for(title)` gibt `["Film"]` zurück
- `episodes_for(title, "Film")` gibt `["Stream"]` zurück
## Optionale Features (Capabilities)
Über `capabilities()` kann das Plugin zusätzliche Funktionen anbieten:
- `popular_series``popular_series()`
@@ -27,7 +44,8 @@ Jedes Plugin muss diese Methoden implementieren:
## Suche (aktuelle Policy)
- **Nur TitelMatches**
- **SubstringMatch** nach Normalisierung (Lowercase + NichtAlnum → Leerzeichen)
- **Wortbasierter Match** nach Normalisierung (Lowercase + NichtAlnum → Leerzeichen)
- Keine Teilwort-Treffer innerhalb eines Wortes (Beispiel: `hund` matcht nicht `thunder`)
- Keine Beschreibung/Plot/Meta für Matches
## Namensgebung
@@ -41,10 +59,23 @@ Standard: `*_base_url` (Domain / BASE_URL)
- `aniworld_base_url`
- `einschalten_base_url`
- `topstream_base_url`
- `filmpalast_base_url`
## Playback
- Wenn möglich `stream_link_for(...)` implementieren.
- Optional `available_hosters_for(...)`/`resolve_stream_link(...)` für HosterAuflösung.
- `stream_link_for(...)` implementieren (liefert bevorzugten Hoster-Link).
- `available_hosters_for(...)` bereitstellen, wenn die Seite mehrere Hoster anbietet.
- `resolve_stream_link(...)` nach einheitlichem Flow umsetzen:
1. Redirects auflösen (falls vorhanden)
2. ResolveURL (`resolveurl_backend.resolve`) versuchen
3. Bei Fehlschlag auf den besten verfügbaren Link zurückfallen
- Optional `set_preferred_hosters(...)` unterstützen, damit die Hoster-Auswahl aus der Hauptlogik direkt greift.
## StandardFlow (empfohlen)
1. **Suche**: nur Titel liefern und Titel→Detail-URL mappen.
2. **Navigation**: `series_url_for_title`/`remember_series_url` unterstützen, damit URLs zwischen Aufrufen stabil bleiben.
3. **Auswahl Hoster**: Hoster-Namen aus der Detailseite extrahieren und anbieten.
4. **Playback**: Hoster-Link liefern, danach konsistent über `resolve_stream_link` finalisieren.
5. **Fallbacks**: bei Layout-Unterschieden defensiv parsen und Logging aktivierbar halten.
## Debugging
Global gesteuert über Settings:
@@ -67,7 +98,10 @@ Plugins sollten die Helper aus `addon/plugin_helpers.py` nutzen:
## BeispielCheckliste
- [ ] `name` korrekt gesetzt
- [ ] `*_base_url` in Settings vorhanden
- [ ] Suche matcht nur Titel
- [ ] Suche matcht nur Titel und wortbasiert
- [ ] `stream_link_for` + `resolve_stream_link` folgen dem Standard-Flow
- [ ] Optional: `available_hosters_for` + `set_preferred_hosters` vorhanden
- [ ] Optional: `series_url_for_title` + `remember_series_url` vorhanden
- [ ] Fehlerbehandlung und Timeouts vorhanden
- [ ] Optional: Caches für Performance

View File

@@ -16,6 +16,7 @@ Weitere Details:
- `topstreamfilm_plugin.py` Topstreamfilm
- `einschalten_plugin.py` Einschalten
- `aniworld_plugin.py` Aniworld
- `filmpalast_plugin.py` Filmpalast
- `_template_plugin.py` Vorlage für neue Plugins
### Plugin-Discovery (Ladeprozess)

20
pyproject.toml Normal file
View File

@@ -0,0 +1,20 @@
[tool.pytest.ini_options]
addopts = "-q --cov=addon --cov-report=term-missing"
python_files = ["test_*.py"]
norecursedirs = ["scripts"]
markers = [
"live: real HTTP requests (set LIVE_TESTS=1 to run)",
"perf: performance benchmarks",
]
[tool.coverage.run]
source = ["addon"]
branch = true
omit = [
"*/__pycache__/*",
"addon/resources/*",
]
[tool.coverage.report]
show_missing = true
skip_empty = true

2
requirements-dev.txt Normal file
View File

@@ -0,0 +1,2 @@
pytest>=9,<10
pytest-cov>=5,<8