"""Einschalten Plugin.
Optionales Debugging wie bei Serienstream:
- URL-Logging
- HTML-Dumps
- On-Screen URL-Info
"""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set
from urllib.parse import urlencode, urljoin, urlsplit
try: # pragma: no cover - optional dependency (Kodi dependency)
import requests
except ImportError as exc: # pragma: no cover
requests = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
try: # pragma: no cover - optional Kodi helpers
import xbmcaddon # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow running outside Kodi
xbmcaddon = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, log_error, log_url, notify_url
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "einschalten_base_url"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_einschalten"
SETTING_DUMP_HTML = "dump_html_einschalten"
SETTING_SHOW_URL_INFO = "show_url_info_einschalten"
SETTING_LOG_ERRORS = "log_errors_einschalten"
DEFAULT_BASE_URL = "https://einschalten.in"
DEFAULT_INDEX_PATH = "/"
DEFAULT_NEW_TITLES_PATH = "/movies/new"
DEFAULT_SEARCH_PATH = "/search"
DEFAULT_GENRES_PATH = "/genres"
DEFAULT_WATCH_PATH_TEMPLATE = "/api/movies/{id}/watch"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/json;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
def _emit_progress(callback: ProgressCallback, message: str, percent: Optional[int] = None) -> None:
if not callable(callback):
return
try:
callback(str(message or ""), None if percent is None else int(percent))
except Exception:
return
@dataclass(frozen=True)
class MovieItem:
id: int
title: str
release_date: str = ""
poster_path: str = ""
vote_average: float | None = None
collection_id: int | None = None
@dataclass(frozen=True)
class MovieDetail:
id: int
title: str
tagline: str = ""
overview: str = ""
release_date: str = ""
runtime_minutes: int | None = None
poster_path: str = ""
backdrop_path: str = ""
vote_average: float | None = None
vote_count: int | None = None
homepage: str = ""
imdb_id: str = ""
wikidata_id: str = ""
genres: List[str] | None = None
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _filter_movies_by_title(query: str, movies: List[MovieItem]) -> List[MovieItem]:
query = (query or "").strip()
if not query:
return []
return [movie for movie in movies if _matches_query(query, title=movie.title)]
def _get_setting_text(setting_id: str, *, default: str = "") -> str:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(ADDON_ID)
getter = getattr(addon, "getSettingString", None)
if getter is not None:
return str(getter(setting_id) or "").strip()
return str(addon.getSetting(setting_id) or "").strip()
except Exception:
return default
def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return get_setting_bool(ADDON_ID, setting_id, default=default)
def _ensure_requests() -> None:
if requests is None:
raise RuntimeError(f"requests ist nicht verfuegbar: {REQUESTS_IMPORT_ERROR}")
def _extract_ng_state_payload(html: str) -> Dict[str, Any]:
"""Extrahiert JSON aus ``."""
html = html or ""
# Regex ist hier ausreichend und vermeidet bs4-Abhängigkeit.
match = re.search(
r'',
html,
flags=re.IGNORECASE | re.DOTALL,
)
if not match:
return {}
raw = (match.group(1) or "").strip()
if not raw:
return {}
try:
data = json.loads(raw)
except Exception:
return {}
return data if isinstance(data, dict) else {}
def _notify_url(url: str) -> None:
notify_url(
ADDON_ID,
heading="Einschalten",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_url(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="einschalten_urls.log",
url=url,
kind=kind,
)
def _log_debug_line(message: str) -> None:
try:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="einschalten_debug.log",
url=message,
kind="DEBUG",
)
except Exception:
pass
def _log_titles(items: list[MovieItem], *, context: str) -> None:
if not items:
return
try:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="einschalten_titles.log",
url=f"{context}:count={len(items)}",
kind="TITLE",
)
for item in items:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="einschalten_titles.log",
url=f"{context}:id={item.id} title={item.title}",
kind="TITLE",
)
except Exception:
pass
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="einschalten_response",
)
def _log_error(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="einschalten_errors.log",
message=message,
)
def _u_matches(value: Any, expected_path: str) -> bool:
raw = (value or "").strip()
if not raw:
return False
if raw == expected_path:
return True
try:
if "://" in raw:
path = urlsplit(raw).path or ""
else:
path = raw.split("?", 1)[0].split("#", 1)[0]
if path == expected_path:
return True
except Exception:
pass
return raw.endswith(expected_path)
def _parse_ng_state_movies(payload: Dict[str, Any]) -> List[MovieItem]:
movies: List[MovieItem] = []
for value in (payload or {}).values():
if not isinstance(value, dict):
continue
# In ng-state payload, `u` (URL) is a sibling of `b` (body), not nested inside `b`.
if not _u_matches(value.get("u"), "/api/movies"):
continue
block = value.get("b")
if not isinstance(block, dict):
continue
data = block.get("data")
if not isinstance(data, list):
continue
for item in data:
if not isinstance(item, dict):
continue
try:
movie_id = int(item.get("id"))
except Exception:
continue
title = str(item.get("title") or "").strip()
if not title:
continue
vote_average = item.get("voteAverage")
try:
vote_average_f = float(vote_average) if vote_average is not None else None
except Exception:
vote_average_f = None
collection_id = item.get("collectionId")
try:
collection_id_i = int(collection_id) if collection_id is not None else None
except Exception:
collection_id_i = None
movies.append(
MovieItem(
id=movie_id,
title=title,
release_date=str(item.get("releaseDate") or ""),
poster_path=str(item.get("posterPath") or ""),
vote_average=vote_average_f,
collection_id=collection_id_i,
)
)
return movies
def _parse_ng_state_movies_with_pagination(payload: Dict[str, Any]) -> tuple[List[MovieItem], bool | None, int | None]:
"""Parses ng-state for `u: "/api/movies"` where `b` contains `{data:[...], pagination:{...}}`.
Returns: (movies, has_more, current_page)
"""
movies: List[MovieItem] = []
has_more: bool | None = None
current_page: int | None = None
for value in (payload or {}).values():
if not isinstance(value, dict):
continue
if not _u_matches(value.get("u"), "/api/movies"):
continue
block = value.get("b")
if not isinstance(block, dict):
continue
pagination = block.get("pagination")
if isinstance(pagination, dict):
if "hasMore" in pagination:
has_more = bool(pagination.get("hasMore") is True)
try:
current_page = int(pagination.get("currentPage")) if pagination.get("currentPage") is not None else None
except Exception:
current_page = None
data = block.get("data")
if not isinstance(data, list):
continue
for item in data:
if not isinstance(item, dict):
continue
try:
movie_id = int(item.get("id"))
except Exception:
continue
title = str(item.get("title") or "").strip()
if not title:
continue
vote_average = item.get("voteAverage")
try:
vote_average_f = float(vote_average) if vote_average is not None else None
except Exception:
vote_average_f = None
collection_id = item.get("collectionId")
try:
collection_id_i = int(collection_id) if collection_id is not None else None
except Exception:
collection_id_i = None
movies.append(
MovieItem(
id=movie_id,
title=title,
release_date=str(item.get("releaseDate") or ""),
poster_path=str(item.get("posterPath") or ""),
vote_average=vote_average_f,
collection_id=collection_id_i,
)
)
# Stop after first matching block (genre pages should only have one).
break
return movies, has_more, current_page
def _parse_ng_state_search_results(payload: Dict[str, Any]) -> List[MovieItem]:
movies: List[MovieItem] = []
for value in (payload or {}).values():
if not isinstance(value, dict):
continue
if not _u_matches(value.get("u"), "/api/search"):
continue
block = value.get("b")
if not isinstance(block, dict):
continue
data = block.get("data")
if not isinstance(data, list):
continue
for item in data:
if not isinstance(item, dict):
continue
try:
movie_id = int(item.get("id"))
except Exception:
continue
title = str(item.get("title") or "").strip()
if not title:
continue
vote_average = item.get("voteAverage")
try:
vote_average_f = float(vote_average) if vote_average is not None else None
except Exception:
vote_average_f = None
collection_id = item.get("collectionId")
try:
collection_id_i = int(collection_id) if collection_id is not None else None
except Exception:
collection_id_i = None
movies.append(
MovieItem(
id=movie_id,
title=title,
release_date=str(item.get("releaseDate") or ""),
poster_path=str(item.get("posterPath") or ""),
vote_average=vote_average_f,
collection_id=collection_id_i,
)
)
return movies
def _parse_ng_state_movie_detail(payload: Dict[str, Any], *, movie_id: int) -> MovieDetail | None:
movie_id = int(movie_id or 0)
if movie_id <= 0:
return None
expected_u = f"/api/movies/{movie_id}"
for value in (payload or {}).values():
if not isinstance(value, dict):
continue
if not _u_matches(value.get("u"), expected_u):
continue
block = value.get("b")
if not isinstance(block, dict):
continue
try:
parsed_id = int(block.get("id"))
except Exception:
continue
if parsed_id != movie_id:
continue
title = str(block.get("title") or "").strip()
if not title:
continue
runtime = block.get("runtime")
try:
runtime_i = int(runtime) if runtime is not None else None
except Exception:
runtime_i = None
vote_average = block.get("voteAverage")
try:
vote_average_f = float(vote_average) if vote_average is not None else None
except Exception:
vote_average_f = None
vote_count = block.get("voteCount")
try:
vote_count_i = int(vote_count) if vote_count is not None else None
except Exception:
vote_count_i = None
genres_raw = block.get("genres")
genres: List[str] | None = None
if isinstance(genres_raw, list):
names: List[str] = []
for g in genres_raw:
if isinstance(g, dict):
name = str(g.get("name") or "").strip()
if name:
names.append(name)
genres = names
return MovieDetail(
id=movie_id,
title=title,
tagline=str(block.get("tagline") or "").strip(),
overview=str(block.get("overview") or "").strip(),
release_date=str(block.get("releaseDate") or "").strip(),
runtime_minutes=runtime_i,
poster_path=str(block.get("posterPath") or "").strip(),
backdrop_path=str(block.get("backdropPath") or "").strip(),
vote_average=vote_average_f,
vote_count=vote_count_i,
homepage=str(block.get("homepage") or "").strip(),
imdb_id=str(block.get("imdbId") or "").strip(),
wikidata_id=str(block.get("wikidataId") or "").strip(),
genres=genres,
)
return None
def _parse_ng_state_genres(payload: Dict[str, Any]) -> Dict[str, int]:
"""Parses ng-state for `u: "/api/genres"` where `b` is a list of {id,name}."""
genres: Dict[str, int] = {}
for value in (payload or {}).values():
if not isinstance(value, dict):
continue
if not _u_matches(value.get("u"), "/api/genres"):
continue
block = value.get("b")
if not isinstance(block, list):
continue
for item in block:
if not isinstance(item, dict):
continue
name = str(item.get("name") or "").strip()
if not name:
continue
try:
gid = int(item.get("id"))
except Exception:
continue
if gid > 0:
genres[name] = gid
return genres
class EinschaltenPlugin(BasisPlugin):
"""Metadata-Plugin für eine autorisierte Quelle."""
name = "Einschalten"
version = "1.0.0"
def __init__(self) -> None:
self.is_available = REQUESTS_AVAILABLE
self.unavailable_reason = None if REQUESTS_AVAILABLE else f"requests fehlt: {REQUESTS_IMPORT_ERROR}"
self._session = None
self._id_by_title: Dict[str, int] = {}
self._detail_html_by_id: Dict[int, str] = {}
self._detail_by_id: Dict[int, MovieDetail] = {}
self._genre_id_by_name: Dict[str, int] = {}
self._genre_has_more_by_id_page: Dict[tuple[int, int], bool] = {}
self._new_titles_has_more_by_page: Dict[int, bool] = {}
def _get_session(self):
_ensure_requests()
if self._session is None:
self._session = requests.Session()
return self._session
def _http_get_text(self, url: str, *, timeout: int = 20) -> tuple[str, str]:
_log_url(url, kind="GET")
_notify_url(url)
sess = self._get_session()
response = None
try:
response = sess.get(url, headers=HEADERS, timeout=timeout)
response.raise_for_status()
final_url = (response.url or url) if response is not None else url
body = (response.text or "") if response is not None else ""
_log_url(final_url, kind="OK")
_log_response_html(final_url, body)
return final_url, body
finally:
if response is not None:
try:
response.close()
except Exception:
pass
def _http_get_json(self, url: str, *, timeout: int = 20) -> tuple[str, Any]:
final_url, body = self._http_get_text(url, timeout=timeout)
try:
payload = json.loads(body or "{}")
except Exception:
payload = {}
return final_url, payload
def _get_base_url(self) -> str:
base = _get_setting_text(SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
return base.rstrip("/")
def _index_url(self) -> str:
base = self._get_base_url()
if not base:
return ""
path = DEFAULT_INDEX_PATH
return urljoin(base + "/", path.lstrip("/"))
def _new_titles_url(self) -> str:
base = self._get_base_url()
if not base:
return ""
path = DEFAULT_NEW_TITLES_PATH
return urljoin(base + "/", path.lstrip("/"))
def _genres_url(self) -> str:
base = self._get_base_url()
if not base:
return ""
path = DEFAULT_GENRES_PATH
return urljoin(base + "/", path.lstrip("/"))
def _api_genres_url(self) -> str:
base = self._get_base_url()
if not base:
return ""
return urljoin(base + "/", "api/genres")
def _search_url(self, query: str) -> str:
base = self._get_base_url()
if not base:
return ""
path = DEFAULT_SEARCH_PATH
url = urljoin(base + "/", path.lstrip("/"))
return f"{url}?{urlencode({'query': query})}"
def _genre_page_url(self, *, genre_id: int, page: int = 1) -> str:
"""Genre title pages are rendered server-side and embed the movie list in ng-state.
Example:
- `/genres/` contains ng-state with `u: "/api/movies"` and `b.data` + `b.pagination`.
"""
base = self._get_base_url()
if not base:
return ""
genre_root = self._genres_url().rstrip("/")
if not genre_root:
return ""
page = max(1, int(page or 1))
url = urljoin(genre_root + "/", str(int(genre_id)))
if page > 1:
url = f"{url}?{urlencode({'page': str(page)})}"
return url
def _movie_detail_url(self, movie_id: int) -> str:
base = self._get_base_url()
if not base:
return ""
return urljoin(base + "/", f"movies/{int(movie_id)}")
def _watch_url(self, movie_id: int) -> str:
base = self._get_base_url()
if not base:
return ""
template = DEFAULT_WATCH_PATH_TEMPLATE
try:
path = template.format(id=int(movie_id))
except Exception:
path = DEFAULT_WATCH_PATH_TEMPLATE.format(id=int(movie_id))
return urljoin(base + "/", path.lstrip("/"))
def _ensure_title_id(self, title: str) -> int | None:
title = (title or "").strip()
if not title:
return None
cached = self._id_by_title.get(title)
if isinstance(cached, int) and cached > 0:
return cached
# Fallback: scan index ng-state again to rebuild mapping.
for movie in self._load_movies():
if movie.title == title:
self._id_by_title[title] = movie.id
return movie.id
# Kodi startet das Plugin pro Navigation neu -> RAM-Cache geht verloren.
# Für Titel, die nicht auf der Index-Seite sind (z.B. /movies/new), lösen wir die ID
# über die Suchseite auf, die ebenfalls `id` + `title` im ng-state liefert.
try:
normalized = title.casefold().strip()
for movie in self._fetch_search_movies(title):
if (movie.title or "").casefold().strip() == normalized:
self._id_by_title[title] = movie.id
return movie.id
except Exception:
pass
return None
def _fetch_movie_detail(self, movie_id: int) -> str:
movie_id = int(movie_id or 0)
if movie_id <= 0:
return ""
cached = self._detail_html_by_id.get(movie_id)
if isinstance(cached, str) and cached:
return cached
url = self._movie_detail_url(movie_id)
if not url:
return ""
try:
_, body = self._http_get_text(url, timeout=20)
self._detail_html_by_id[movie_id] = body
return body
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
return ""
def _fetch_watch_payload(self, movie_id: int) -> dict[str, object]:
movie_id = int(movie_id or 0)
if movie_id <= 0:
return {}
url = self._watch_url(movie_id)
if not url:
return {}
try:
_, data = self._http_get_json(url, timeout=20)
return data
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
return {}
def _watch_stream_url(self, movie_id: int) -> str:
payload = self._fetch_watch_payload(movie_id)
stream_url = payload.get("streamUrl")
return str(stream_url).strip() if isinstance(stream_url, str) and stream_url.strip() else ""
def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
"""Optional hook for the UI layer (default.py) to attach metadata/art without TMDB."""
title = (title or "").strip()
movie_id = self._ensure_title_id(title)
if movie_id is None:
return {}, {}, None
detail = self._detail_by_id.get(movie_id)
if detail is None:
html = self._fetch_movie_detail(movie_id)
payload = _extract_ng_state_payload(html)
parsed = _parse_ng_state_movie_detail(payload, movie_id=movie_id)
if parsed is not None:
self._detail_by_id[movie_id] = parsed
detail = parsed
info: dict[str, str] = {"mediatype": "movie", "title": title}
art: dict[str, str] = {}
if detail is None:
return info, art, None
if detail.overview:
info["plot"] = detail.overview
if detail.tagline:
info["tagline"] = detail.tagline
if detail.release_date:
info["premiered"] = detail.release_date
if len(detail.release_date) >= 4 and detail.release_date[:4].isdigit():
info["year"] = detail.release_date[:4]
if detail.runtime_minutes is not None and detail.runtime_minutes > 0:
info["duration"] = str(int(detail.runtime_minutes) * 60)
if detail.vote_average is not None:
info["rating"] = str(detail.vote_average)
if detail.vote_count is not None:
info["votes"] = str(detail.vote_count)
if detail.genres:
info["genre"] = " / ".join(detail.genres)
base = self._get_base_url()
if base:
if detail.poster_path:
poster = urljoin(base + "/", f"api/image/poster/{detail.poster_path.lstrip('/')}")
art.update({"thumb": poster, "poster": poster})
if detail.backdrop_path:
backdrop = urljoin(base + "/", f"api/image/backdrop/{detail.backdrop_path.lstrip('/')}")
art.setdefault("fanart", backdrop)
art.setdefault("landscape", backdrop)
return info, art, None
def _fetch_index_movies(self) -> List[MovieItem]:
url = self._index_url()
if not url:
return []
try:
_, body = self._http_get_text(url, timeout=20)
payload = _extract_ng_state_payload(body)
return _parse_ng_state_movies(payload)
except Exception:
return []
def _fetch_new_titles_movies_page(self, page: int) -> List[MovieItem]:
page = max(1, int(page or 1))
url = self._new_titles_url()
if not url:
return []
if page > 1:
url = f"{url}?{urlencode({'page': str(page)})}"
try:
_, body = self._http_get_text(url, timeout=20)
payload = _extract_ng_state_payload(body)
movies, has_more, current_page = _parse_ng_state_movies_with_pagination(payload)
_log_debug_line(f"parse_ng_state_movies_page:page={page} count={len(movies)}")
if has_more is not None:
self._new_titles_has_more_by_page[page] = bool(has_more)
elif current_page is not None and int(current_page) != page:
self._new_titles_has_more_by_page[page] = False
if movies:
_log_titles(movies, context=f"new_titles_page={page}")
return movies
self._new_titles_has_more_by_page[page] = False
return []
except Exception:
return []
def new_titles_page(self, page: int) -> List[str]:
"""Paged variant: returns titles for `/movies/new?page=`."""
if not REQUESTS_AVAILABLE:
return []
if not self._get_base_url():
return []
page = max(1, int(page or 1))
movies = self._fetch_new_titles_movies_page(page)
titles: List[str] = []
seen: set[str] = set()
for movie in movies:
if movie.title in seen:
continue
seen.add(movie.title)
self._id_by_title[movie.title] = movie.id
titles.append(movie.title)
return titles
def new_titles_has_more(self, page: int) -> bool:
"""Tells the UI whether `/movies/new` has a next page after `page`."""
page = max(1, int(page or 1))
cached = self._new_titles_has_more_by_page.get(page)
if cached is not None:
return bool(cached)
# Load page to fill cache.
_ = self._fetch_new_titles_movies_page(page)
return bool(self._new_titles_has_more_by_page.get(page, False))
def _fetch_search_movies(self, query: str) -> List[MovieItem]:
query = (query or "").strip()
if not query:
return []
# Parse ng-state from /search page HTML.
url = self._search_url(query)
if not url:
return []
try:
_, body = self._http_get_text(url, timeout=20)
payload = _extract_ng_state_payload(body)
results = _parse_ng_state_search_results(payload)
return _filter_movies_by_title(query, results)
except Exception:
return []
def _load_movies(self) -> List[MovieItem]:
return self._fetch_index_movies()
def _ensure_genre_index(self) -> None:
if self._genre_id_by_name:
return
# Prefer direct JSON API (simpler): GET /api/genres -> [{"id":..,"name":..}, ...]
api_url = self._api_genres_url()
if api_url:
try:
_, payload = self._http_get_json(api_url, timeout=20)
if isinstance(payload, list):
parsed: Dict[str, int] = {}
for item in payload:
if not isinstance(item, dict):
continue
name = str(item.get("name") or "").strip()
if not name:
continue
try:
gid = int(item.get("id"))
except Exception:
continue
if gid > 0:
parsed[name] = gid
if parsed:
self._genre_id_by_name.clear()
self._genre_id_by_name.update(parsed)
return
except Exception:
pass
# Fallback: parse ng-state from HTML /genres page.
url = self._genres_url()
if not url:
return
try:
_, body = self._http_get_text(url, timeout=20)
payload = _extract_ng_state_payload(body)
parsed = _parse_ng_state_genres(payload)
if parsed:
self._genre_id_by_name.clear()
self._genre_id_by_name.update(parsed)
except Exception:
return
async def search_titles(self, query: str, progress_callback: ProgressCallback = None) -> List[str]:
if not REQUESTS_AVAILABLE:
return []
query = (query or "").strip()
if not query:
return []
if not self._get_base_url():
return []
_emit_progress(progress_callback, "Einschalten Suche", 15)
movies = self._fetch_search_movies(query)
if not movies:
_emit_progress(progress_callback, "Fallback: Index filtern", 45)
movies = _filter_movies_by_title(query, self._load_movies())
_emit_progress(progress_callback, f"Treffer verarbeiten ({len(movies)})", 75)
titles: List[str] = []
seen: set[str] = set()
for movie in movies:
if movie.title in seen:
continue
seen.add(movie.title)
self._id_by_title[movie.title] = movie.id
titles.append(movie.title)
titles.sort(key=lambda value: value.casefold())
_emit_progress(progress_callback, f"Fertig: {len(titles)} Treffer", 95)
return titles
def genres(self) -> List[str]:
if not REQUESTS_AVAILABLE:
return []
if not self._get_base_url():
return []
self._ensure_genre_index()
return sorted(self._genre_id_by_name.keys(), key=lambda value: value.casefold())
def titles_for_genre(self, genre: str) -> List[str]:
# Backwards compatible (first page only); paging handled via titles_for_genre_page().
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
if not REQUESTS_AVAILABLE:
return []
genre = (genre or "").strip()
if not genre:
return []
if not self._get_base_url():
return []
self._ensure_genre_index()
genre_id = self._genre_id_by_name.get(genre)
if not genre_id:
return []
# Do NOT use `/api/movies?withGenres=...` directly: on some deployments it returns
# a mismatched/unfiltered dataset. Instead parse the server-rendered genre page
# `/genres/` which embeds the correct data in ng-state.
url = self._genre_page_url(genre_id=int(genre_id), page=max(1, int(page or 1)))
if not url:
return []
try:
_, body = self._http_get_text(url, timeout=20)
payload = _extract_ng_state_payload(body)
except Exception:
return []
if not isinstance(payload, dict):
return []
movies, has_more, current_page = _parse_ng_state_movies_with_pagination(payload)
page = max(1, int(page or 1))
if has_more is not None:
self._genre_has_more_by_id_page[(int(genre_id), page)] = bool(has_more)
elif current_page is not None and int(current_page) != page:
# Defensive: if the page param wasn't honored, avoid showing "next".
self._genre_has_more_by_id_page[(int(genre_id), page)] = False
titles: List[str] = []
seen: set[str] = set()
for movie in movies:
title = (movie.title or "").strip()
if not title or title in seen:
continue
seen.add(title)
if movie.id > 0:
self._id_by_title[title] = int(movie.id)
titles.append(title)
return titles
def genre_has_more(self, genre: str, page: int) -> bool:
"""Optional: tells the UI whether a genre has more pages after `page`."""
genre = (genre or "").strip()
if not genre:
return False
self._ensure_genre_index()
genre_id = self._genre_id_by_name.get(genre)
if not genre_id:
return False
page = max(1, int(page or 1))
cached = self._genre_has_more_by_id_page.get((int(genre_id), page))
if cached is not None:
return bool(cached)
# If the page wasn't loaded yet, load it (fills the cache) and then report.
_ = self.titles_for_genre_page(genre, page)
return bool(self._genre_has_more_by_id_page.get((int(genre_id), page), False))
def seasons_for(self, title: str) -> List[str]:
# Beim Öffnen eines Titels: Detailseite anhand der ID abrufen (HTML) und cachen.
title = (title or "").strip()
if not title:
return []
movie_id = self._ensure_title_id(title)
if movie_id is not None:
self._fetch_movie_detail(movie_id)
# Playback: expose a single "Stream" folder (inside: 1 playable item = Filmtitel).
return ["Stream"]
def episodes_for(self, title: str, season: str) -> List[str]:
season = (season or "").strip()
if season.casefold() == "stream":
title = (title or "").strip()
return [title] if title else []
return []
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
season = (season or "").strip()
episode = (episode or "").strip()
# Backwards compatible:
# - old: Film / Stream
# - new: Stream /
if not title:
return None
if season.casefold() == "film" and episode.casefold() == "stream":
pass
elif season.casefold() == "stream" and (episode == title or episode.casefold() == "stream"):
pass
else:
return None
movie_id = self._ensure_title_id(title)
if movie_id is None:
return None
stream_url = self._watch_stream_url(movie_id)
return stream_url or None
def resolve_stream_link(self, link: str) -> Optional[str]:
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
return resolve_with_resolveurl(link) or link
return link
def capabilities(self) -> Set[str]:
return {"new_titles", "genres"}
def new_titles(self) -> List[str]:
if not REQUESTS_AVAILABLE:
return []
if not self._get_base_url():
return []
# Backwards compatible: first page only. UI uses paging via `new_titles_page`.
return self.new_titles_page(1)
# Alias für die automatische Plugin-Erkennung.
Plugin = EinschaltenPlugin