1056 lines
37 KiB
Python
1056 lines
37 KiB
Python
"""Einschalten Plugin.
|
|
|
|
Optionales Debugging wie bei Serienstream:
|
|
- URL-Logging
|
|
- HTML-Dumps
|
|
- On-Screen URL-Info
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
from urllib.parse import urlencode, urljoin, urlsplit
|
|
|
|
try: # pragma: no cover - optional dependency (Kodi dependency)
|
|
import requests
|
|
except ImportError as exc: # pragma: no cover
|
|
requests = None
|
|
REQUESTS_AVAILABLE = False
|
|
REQUESTS_IMPORT_ERROR = exc
|
|
else:
|
|
REQUESTS_AVAILABLE = True
|
|
REQUESTS_IMPORT_ERROR = None
|
|
|
|
try: # pragma: no cover - optional Kodi helpers
|
|
import xbmcaddon # type: ignore[import-not-found]
|
|
except ImportError: # pragma: no cover - allow running outside Kodi
|
|
xbmcaddon = None
|
|
|
|
from plugin_interface import BasisPlugin
|
|
from plugin_helpers import dump_response_html, get_setting_bool, log_error, log_url, notify_url
|
|
from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text
|
|
|
|
ADDON_ID = "plugin.video.viewit"
|
|
SETTING_BASE_URL = "einschalten_base_url"
|
|
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
|
|
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
|
|
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
|
|
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
|
|
SETTING_LOG_URLS = "log_urls_einschalten"
|
|
SETTING_DUMP_HTML = "dump_html_einschalten"
|
|
SETTING_SHOW_URL_INFO = "show_url_info_einschalten"
|
|
SETTING_LOG_ERRORS = "log_errors_einschalten"
|
|
|
|
DEFAULT_BASE_URL = "https://einschalten.in"
|
|
DEFAULT_INDEX_PATH = "/"
|
|
DEFAULT_NEW_TITLES_PATH = "/movies/new"
|
|
DEFAULT_SEARCH_PATH = "/search"
|
|
DEFAULT_GENRES_PATH = "/genres"
|
|
DEFAULT_WATCH_PATH_TEMPLATE = "/api/movies/{id}/watch"
|
|
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
|
|
"Accept": "text/html,application/xhtml+xml,application/json;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
|
"Connection": "keep-alive",
|
|
}
|
|
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
|
|
|
|
|
|
def _emit_progress(callback: ProgressCallback, message: str, percent: Optional[int] = None) -> None:
|
|
if not callable(callback):
|
|
return
|
|
try:
|
|
callback(str(message or ""), None if percent is None else int(percent))
|
|
except Exception:
|
|
return
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MovieItem:
|
|
id: int
|
|
title: str
|
|
release_date: str = ""
|
|
poster_path: str = ""
|
|
vote_average: float | None = None
|
|
collection_id: int | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MovieDetail:
|
|
id: int
|
|
title: str
|
|
tagline: str = ""
|
|
overview: str = ""
|
|
release_date: str = ""
|
|
runtime_minutes: int | None = None
|
|
poster_path: str = ""
|
|
backdrop_path: str = ""
|
|
vote_average: float | None = None
|
|
vote_count: int | None = None
|
|
homepage: str = ""
|
|
imdb_id: str = ""
|
|
wikidata_id: str = ""
|
|
genres: List[str] | None = None
|
|
|
|
|
|
def _normalize_search_text(value: str) -> str:
|
|
return _shared_normalize_search_text(value)
|
|
|
|
|
|
def _matches_query(query: str, *, title: str) -> bool:
|
|
return _shared_matches_query(query, title=title)
|
|
|
|
|
|
def _filter_movies_by_title(query: str, movies: List[MovieItem]) -> List[MovieItem]:
|
|
query = (query or "").strip()
|
|
if not query:
|
|
return []
|
|
return [movie for movie in movies if _matches_query(query, title=movie.title)]
|
|
|
|
|
|
def _get_setting_text(setting_id: str, *, default: str = "") -> str:
|
|
if xbmcaddon is None:
|
|
return default
|
|
try:
|
|
addon = xbmcaddon.Addon(ADDON_ID)
|
|
getter = getattr(addon, "getSettingString", None)
|
|
if getter is not None:
|
|
return str(getter(setting_id) or "").strip()
|
|
return str(addon.getSetting(setting_id) or "").strip()
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
|
|
return get_setting_bool(ADDON_ID, setting_id, default=default)
|
|
|
|
|
|
def _ensure_requests() -> None:
|
|
if requests is None:
|
|
raise RuntimeError(f"requests ist nicht verfuegbar: {REQUESTS_IMPORT_ERROR}")
|
|
|
|
|
|
def _extract_ng_state_payload(html: str) -> Dict[str, Any]:
|
|
"""Extrahiert JSON aus `<script id="ng-state" type="application/json">...</script>`."""
|
|
html = html or ""
|
|
# Regex ist hier ausreichend und vermeidet bs4-Abhängigkeit.
|
|
match = re.search(
|
|
r'<script[^>]*id=["\\\']ng-state["\\\'][^>]*>(.*?)</script>',
|
|
html,
|
|
flags=re.IGNORECASE | re.DOTALL,
|
|
)
|
|
if not match:
|
|
return {}
|
|
raw = (match.group(1) or "").strip()
|
|
if not raw:
|
|
return {}
|
|
try:
|
|
data = json.loads(raw)
|
|
except Exception:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
def _notify_url(url: str) -> None:
|
|
notify_url(
|
|
ADDON_ID,
|
|
heading="Einschalten",
|
|
url=url,
|
|
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
|
|
plugin_setting_id=SETTING_SHOW_URL_INFO,
|
|
)
|
|
|
|
|
|
def _log_url(url: str, *, kind: str = "VISIT") -> None:
|
|
log_url(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
|
|
plugin_setting_id=SETTING_LOG_URLS,
|
|
log_filename="einschalten_urls.log",
|
|
url=url,
|
|
kind=kind,
|
|
)
|
|
|
|
|
|
def _log_debug_line(message: str) -> None:
|
|
try:
|
|
log_url(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
|
|
plugin_setting_id=SETTING_LOG_URLS,
|
|
log_filename="einschalten_debug.log",
|
|
url=message,
|
|
kind="DEBUG",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _log_titles(items: list[MovieItem], *, context: str) -> None:
|
|
if not items:
|
|
return
|
|
try:
|
|
log_url(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
|
|
plugin_setting_id=SETTING_LOG_URLS,
|
|
log_filename="einschalten_titles.log",
|
|
url=f"{context}:count={len(items)}",
|
|
kind="TITLE",
|
|
)
|
|
for item in items:
|
|
log_url(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
|
|
plugin_setting_id=SETTING_LOG_URLS,
|
|
log_filename="einschalten_titles.log",
|
|
url=f"{context}:id={item.id} title={item.title}",
|
|
kind="TITLE",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _log_response_html(url: str, body: str) -> None:
|
|
dump_response_html(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
|
|
plugin_setting_id=SETTING_DUMP_HTML,
|
|
url=url,
|
|
body=body,
|
|
filename_prefix="einschalten_response",
|
|
)
|
|
|
|
|
|
def _log_error(message: str) -> None:
|
|
log_error(
|
|
ADDON_ID,
|
|
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
|
|
plugin_setting_id=SETTING_LOG_ERRORS,
|
|
log_filename="einschalten_errors.log",
|
|
message=message,
|
|
)
|
|
|
|
def _u_matches(value: Any, expected_path: str) -> bool:
|
|
raw = (value or "").strip()
|
|
if not raw:
|
|
return False
|
|
if raw == expected_path:
|
|
return True
|
|
try:
|
|
if "://" in raw:
|
|
path = urlsplit(raw).path or ""
|
|
else:
|
|
path = raw.split("?", 1)[0].split("#", 1)[0]
|
|
if path == expected_path:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return raw.endswith(expected_path)
|
|
|
|
|
|
def _parse_ng_state_movies(payload: Dict[str, Any]) -> List[MovieItem]:
|
|
movies: List[MovieItem] = []
|
|
for value in (payload or {}).values():
|
|
if not isinstance(value, dict):
|
|
continue
|
|
# In ng-state payload, `u` (URL) is a sibling of `b` (body), not nested inside `b`.
|
|
if not _u_matches(value.get("u"), "/api/movies"):
|
|
continue
|
|
block = value.get("b")
|
|
if not isinstance(block, dict):
|
|
continue
|
|
data = block.get("data")
|
|
if not isinstance(data, list):
|
|
continue
|
|
for item in data:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
try:
|
|
movie_id = int(item.get("id"))
|
|
except Exception:
|
|
continue
|
|
title = str(item.get("title") or "").strip()
|
|
if not title:
|
|
continue
|
|
vote_average = item.get("voteAverage")
|
|
try:
|
|
vote_average_f = float(vote_average) if vote_average is not None else None
|
|
except Exception:
|
|
vote_average_f = None
|
|
collection_id = item.get("collectionId")
|
|
try:
|
|
collection_id_i = int(collection_id) if collection_id is not None else None
|
|
except Exception:
|
|
collection_id_i = None
|
|
movies.append(
|
|
MovieItem(
|
|
id=movie_id,
|
|
title=title,
|
|
release_date=str(item.get("releaseDate") or ""),
|
|
poster_path=str(item.get("posterPath") or ""),
|
|
vote_average=vote_average_f,
|
|
collection_id=collection_id_i,
|
|
)
|
|
)
|
|
return movies
|
|
|
|
|
|
def _parse_ng_state_movies_with_pagination(payload: Dict[str, Any]) -> tuple[List[MovieItem], bool | None, int | None]:
|
|
"""Parses ng-state for `u: "/api/movies"` where `b` contains `{data:[...], pagination:{...}}`.
|
|
|
|
Returns: (movies, has_more, current_page)
|
|
"""
|
|
|
|
movies: List[MovieItem] = []
|
|
has_more: bool | None = None
|
|
current_page: int | None = None
|
|
|
|
for value in (payload or {}).values():
|
|
if not isinstance(value, dict):
|
|
continue
|
|
if not _u_matches(value.get("u"), "/api/movies"):
|
|
continue
|
|
block = value.get("b")
|
|
if not isinstance(block, dict):
|
|
continue
|
|
|
|
pagination = block.get("pagination")
|
|
if isinstance(pagination, dict):
|
|
if "hasMore" in pagination:
|
|
has_more = bool(pagination.get("hasMore") is True)
|
|
try:
|
|
current_page = int(pagination.get("currentPage")) if pagination.get("currentPage") is not None else None
|
|
except Exception:
|
|
current_page = None
|
|
|
|
data = block.get("data")
|
|
if not isinstance(data, list):
|
|
continue
|
|
|
|
for item in data:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
try:
|
|
movie_id = int(item.get("id"))
|
|
except Exception:
|
|
continue
|
|
title = str(item.get("title") or "").strip()
|
|
if not title:
|
|
continue
|
|
vote_average = item.get("voteAverage")
|
|
try:
|
|
vote_average_f = float(vote_average) if vote_average is not None else None
|
|
except Exception:
|
|
vote_average_f = None
|
|
collection_id = item.get("collectionId")
|
|
try:
|
|
collection_id_i = int(collection_id) if collection_id is not None else None
|
|
except Exception:
|
|
collection_id_i = None
|
|
movies.append(
|
|
MovieItem(
|
|
id=movie_id,
|
|
title=title,
|
|
release_date=str(item.get("releaseDate") or ""),
|
|
poster_path=str(item.get("posterPath") or ""),
|
|
vote_average=vote_average_f,
|
|
collection_id=collection_id_i,
|
|
)
|
|
)
|
|
|
|
# Stop after first matching block (genre pages should only have one).
|
|
break
|
|
|
|
return movies, has_more, current_page
|
|
|
|
|
|
def _parse_ng_state_search_results(payload: Dict[str, Any]) -> List[MovieItem]:
|
|
movies: List[MovieItem] = []
|
|
for value in (payload or {}).values():
|
|
if not isinstance(value, dict):
|
|
continue
|
|
if not _u_matches(value.get("u"), "/api/search"):
|
|
continue
|
|
block = value.get("b")
|
|
if not isinstance(block, dict):
|
|
continue
|
|
data = block.get("data")
|
|
if not isinstance(data, list):
|
|
continue
|
|
for item in data:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
try:
|
|
movie_id = int(item.get("id"))
|
|
except Exception:
|
|
continue
|
|
title = str(item.get("title") or "").strip()
|
|
if not title:
|
|
continue
|
|
vote_average = item.get("voteAverage")
|
|
try:
|
|
vote_average_f = float(vote_average) if vote_average is not None else None
|
|
except Exception:
|
|
vote_average_f = None
|
|
collection_id = item.get("collectionId")
|
|
try:
|
|
collection_id_i = int(collection_id) if collection_id is not None else None
|
|
except Exception:
|
|
collection_id_i = None
|
|
movies.append(
|
|
MovieItem(
|
|
id=movie_id,
|
|
title=title,
|
|
release_date=str(item.get("releaseDate") or ""),
|
|
poster_path=str(item.get("posterPath") or ""),
|
|
vote_average=vote_average_f,
|
|
collection_id=collection_id_i,
|
|
)
|
|
)
|
|
return movies
|
|
|
|
|
|
def _parse_ng_state_movie_detail(payload: Dict[str, Any], *, movie_id: int) -> MovieDetail | None:
|
|
movie_id = int(movie_id or 0)
|
|
if movie_id <= 0:
|
|
return None
|
|
expected_u = f"/api/movies/{movie_id}"
|
|
for value in (payload or {}).values():
|
|
if not isinstance(value, dict):
|
|
continue
|
|
if not _u_matches(value.get("u"), expected_u):
|
|
continue
|
|
block = value.get("b")
|
|
if not isinstance(block, dict):
|
|
continue
|
|
try:
|
|
parsed_id = int(block.get("id"))
|
|
except Exception:
|
|
continue
|
|
if parsed_id != movie_id:
|
|
continue
|
|
title = str(block.get("title") or "").strip()
|
|
if not title:
|
|
continue
|
|
runtime = block.get("runtime")
|
|
try:
|
|
runtime_i = int(runtime) if runtime is not None else None
|
|
except Exception:
|
|
runtime_i = None
|
|
vote_average = block.get("voteAverage")
|
|
try:
|
|
vote_average_f = float(vote_average) if vote_average is not None else None
|
|
except Exception:
|
|
vote_average_f = None
|
|
vote_count = block.get("voteCount")
|
|
try:
|
|
vote_count_i = int(vote_count) if vote_count is not None else None
|
|
except Exception:
|
|
vote_count_i = None
|
|
genres_raw = block.get("genres")
|
|
genres: List[str] | None = None
|
|
if isinstance(genres_raw, list):
|
|
names: List[str] = []
|
|
for g in genres_raw:
|
|
if isinstance(g, dict):
|
|
name = str(g.get("name") or "").strip()
|
|
if name:
|
|
names.append(name)
|
|
genres = names
|
|
return MovieDetail(
|
|
id=movie_id,
|
|
title=title,
|
|
tagline=str(block.get("tagline") or "").strip(),
|
|
overview=str(block.get("overview") or "").strip(),
|
|
release_date=str(block.get("releaseDate") or "").strip(),
|
|
runtime_minutes=runtime_i,
|
|
poster_path=str(block.get("posterPath") or "").strip(),
|
|
backdrop_path=str(block.get("backdropPath") or "").strip(),
|
|
vote_average=vote_average_f,
|
|
vote_count=vote_count_i,
|
|
homepage=str(block.get("homepage") or "").strip(),
|
|
imdb_id=str(block.get("imdbId") or "").strip(),
|
|
wikidata_id=str(block.get("wikidataId") or "").strip(),
|
|
genres=genres,
|
|
)
|
|
return None
|
|
|
|
|
|
def _parse_ng_state_genres(payload: Dict[str, Any]) -> Dict[str, int]:
|
|
"""Parses ng-state for `u: "/api/genres"` where `b` is a list of {id,name}."""
|
|
genres: Dict[str, int] = {}
|
|
for value in (payload or {}).values():
|
|
if not isinstance(value, dict):
|
|
continue
|
|
if not _u_matches(value.get("u"), "/api/genres"):
|
|
continue
|
|
block = value.get("b")
|
|
if not isinstance(block, list):
|
|
continue
|
|
for item in block:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = str(item.get("name") or "").strip()
|
|
if not name:
|
|
continue
|
|
try:
|
|
gid = int(item.get("id"))
|
|
except Exception:
|
|
continue
|
|
if gid > 0:
|
|
genres[name] = gid
|
|
return genres
|
|
|
|
|
|
class EinschaltenPlugin(BasisPlugin):
|
|
"""Metadata-Plugin für eine autorisierte Quelle."""
|
|
|
|
name = "Einschalten"
|
|
version = "1.0.0"
|
|
|
|
def __init__(self) -> None:
|
|
self.is_available = REQUESTS_AVAILABLE
|
|
self.unavailable_reason = None if REQUESTS_AVAILABLE else f"requests fehlt: {REQUESTS_IMPORT_ERROR}"
|
|
self._session = None
|
|
self._id_by_title: Dict[str, int] = {}
|
|
self._detail_html_by_id: Dict[int, str] = {}
|
|
self._detail_by_id: Dict[int, MovieDetail] = {}
|
|
self._genre_id_by_name: Dict[str, int] = {}
|
|
self._genre_has_more_by_id_page: Dict[tuple[int, int], bool] = {}
|
|
self._new_titles_has_more_by_page: Dict[int, bool] = {}
|
|
|
|
def _get_session(self):
|
|
_ensure_requests()
|
|
if self._session is None:
|
|
self._session = requests.Session()
|
|
return self._session
|
|
|
|
def _http_get_text(self, url: str, *, timeout: int = 20) -> tuple[str, str]:
|
|
_log_url(url, kind="GET")
|
|
_notify_url(url)
|
|
sess = self._get_session()
|
|
response = None
|
|
try:
|
|
response = sess.get(url, headers=HEADERS, timeout=timeout)
|
|
response.raise_for_status()
|
|
final_url = (response.url or url) if response is not None else url
|
|
body = (response.text or "") if response is not None else ""
|
|
_log_url(final_url, kind="OK")
|
|
_log_response_html(final_url, body)
|
|
return final_url, body
|
|
finally:
|
|
if response is not None:
|
|
try:
|
|
response.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _http_get_json(self, url: str, *, timeout: int = 20) -> tuple[str, Any]:
|
|
final_url, body = self._http_get_text(url, timeout=timeout)
|
|
try:
|
|
payload = json.loads(body or "{}")
|
|
except Exception:
|
|
payload = {}
|
|
return final_url, payload
|
|
|
|
def _get_base_url(self) -> str:
|
|
base = _get_setting_text(SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
|
|
return base.rstrip("/")
|
|
|
|
def _index_url(self) -> str:
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
path = DEFAULT_INDEX_PATH
|
|
return urljoin(base + "/", path.lstrip("/"))
|
|
|
|
def _new_titles_url(self) -> str:
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
path = DEFAULT_NEW_TITLES_PATH
|
|
return urljoin(base + "/", path.lstrip("/"))
|
|
|
|
def _genres_url(self) -> str:
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
path = DEFAULT_GENRES_PATH
|
|
return urljoin(base + "/", path.lstrip("/"))
|
|
|
|
def _api_genres_url(self) -> str:
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
return urljoin(base + "/", "api/genres")
|
|
|
|
def _search_url(self, query: str) -> str:
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
path = DEFAULT_SEARCH_PATH
|
|
url = urljoin(base + "/", path.lstrip("/"))
|
|
return f"{url}?{urlencode({'query': query})}"
|
|
|
|
def _genre_page_url(self, *, genre_id: int, page: int = 1) -> str:
|
|
"""Genre title pages are rendered server-side and embed the movie list in ng-state.
|
|
|
|
Example:
|
|
- `/genres/<id>` contains ng-state with `u: "/api/movies"` and `b.data` + `b.pagination`.
|
|
"""
|
|
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
genre_root = self._genres_url().rstrip("/")
|
|
if not genre_root:
|
|
return ""
|
|
page = max(1, int(page or 1))
|
|
url = urljoin(genre_root + "/", str(int(genre_id)))
|
|
if page > 1:
|
|
url = f"{url}?{urlencode({'page': str(page)})}"
|
|
return url
|
|
|
|
def _movie_detail_url(self, movie_id: int) -> str:
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
return urljoin(base + "/", f"movies/{int(movie_id)}")
|
|
|
|
def _watch_url(self, movie_id: int) -> str:
|
|
base = self._get_base_url()
|
|
if not base:
|
|
return ""
|
|
template = DEFAULT_WATCH_PATH_TEMPLATE
|
|
try:
|
|
path = template.format(id=int(movie_id))
|
|
except Exception:
|
|
path = DEFAULT_WATCH_PATH_TEMPLATE.format(id=int(movie_id))
|
|
return urljoin(base + "/", path.lstrip("/"))
|
|
|
|
def _ensure_title_id(self, title: str) -> int | None:
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return None
|
|
cached = self._id_by_title.get(title)
|
|
if isinstance(cached, int) and cached > 0:
|
|
return cached
|
|
# Fallback: scan index ng-state again to rebuild mapping.
|
|
for movie in self._load_movies():
|
|
if movie.title == title:
|
|
self._id_by_title[title] = movie.id
|
|
return movie.id
|
|
# Kodi startet das Plugin pro Navigation neu -> RAM-Cache geht verloren.
|
|
# Für Titel, die nicht auf der Index-Seite sind (z.B. /movies/new), lösen wir die ID
|
|
# über die Suchseite auf, die ebenfalls `id` + `title` im ng-state liefert.
|
|
try:
|
|
normalized = title.casefold().strip()
|
|
for movie in self._fetch_search_movies(title):
|
|
if (movie.title or "").casefold().strip() == normalized:
|
|
self._id_by_title[title] = movie.id
|
|
return movie.id
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _fetch_movie_detail(self, movie_id: int) -> str:
|
|
movie_id = int(movie_id or 0)
|
|
if movie_id <= 0:
|
|
return ""
|
|
cached = self._detail_html_by_id.get(movie_id)
|
|
if isinstance(cached, str) and cached:
|
|
return cached
|
|
url = self._movie_detail_url(movie_id)
|
|
if not url:
|
|
return ""
|
|
try:
|
|
_, body = self._http_get_text(url, timeout=20)
|
|
self._detail_html_by_id[movie_id] = body
|
|
return body
|
|
except Exception as exc:
|
|
_log_error(f"GET {url} failed: {exc}")
|
|
return ""
|
|
|
|
def _fetch_watch_payload(self, movie_id: int) -> dict[str, object]:
|
|
movie_id = int(movie_id or 0)
|
|
if movie_id <= 0:
|
|
return {}
|
|
url = self._watch_url(movie_id)
|
|
if not url:
|
|
return {}
|
|
try:
|
|
_, data = self._http_get_json(url, timeout=20)
|
|
return data
|
|
except Exception as exc:
|
|
_log_error(f"GET {url} failed: {exc}")
|
|
return {}
|
|
|
|
def _watch_stream_url(self, movie_id: int) -> str:
|
|
payload = self._fetch_watch_payload(movie_id)
|
|
stream_url = payload.get("streamUrl")
|
|
return str(stream_url).strip() if isinstance(stream_url, str) and stream_url.strip() else ""
|
|
|
|
def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
|
|
"""Optional hook for the UI layer (default.py) to attach metadata/art without TMDB."""
|
|
title = (title or "").strip()
|
|
movie_id = self._ensure_title_id(title)
|
|
if movie_id is None:
|
|
return {}, {}, None
|
|
|
|
detail = self._detail_by_id.get(movie_id)
|
|
if detail is None:
|
|
html = self._fetch_movie_detail(movie_id)
|
|
payload = _extract_ng_state_payload(html)
|
|
parsed = _parse_ng_state_movie_detail(payload, movie_id=movie_id)
|
|
if parsed is not None:
|
|
self._detail_by_id[movie_id] = parsed
|
|
detail = parsed
|
|
|
|
info: dict[str, str] = {"mediatype": "movie", "title": title}
|
|
art: dict[str, str] = {}
|
|
if detail is None:
|
|
return info, art, None
|
|
|
|
if detail.overview:
|
|
info["plot"] = detail.overview
|
|
if detail.tagline:
|
|
info["tagline"] = detail.tagline
|
|
if detail.release_date:
|
|
info["premiered"] = detail.release_date
|
|
if len(detail.release_date) >= 4 and detail.release_date[:4].isdigit():
|
|
info["year"] = detail.release_date[:4]
|
|
if detail.runtime_minutes is not None and detail.runtime_minutes > 0:
|
|
info["duration"] = str(int(detail.runtime_minutes) * 60)
|
|
if detail.vote_average is not None:
|
|
info["rating"] = str(detail.vote_average)
|
|
if detail.vote_count is not None:
|
|
info["votes"] = str(detail.vote_count)
|
|
if detail.genres:
|
|
info["genre"] = " / ".join(detail.genres)
|
|
|
|
base = self._get_base_url()
|
|
if base:
|
|
if detail.poster_path:
|
|
poster = urljoin(base + "/", f"api/image/poster/{detail.poster_path.lstrip('/')}")
|
|
art.update({"thumb": poster, "poster": poster})
|
|
if detail.backdrop_path:
|
|
backdrop = urljoin(base + "/", f"api/image/backdrop/{detail.backdrop_path.lstrip('/')}")
|
|
art.setdefault("fanart", backdrop)
|
|
art.setdefault("landscape", backdrop)
|
|
|
|
return info, art, None
|
|
|
|
def _fetch_index_movies(self) -> List[MovieItem]:
|
|
url = self._index_url()
|
|
if not url:
|
|
return []
|
|
try:
|
|
_, body = self._http_get_text(url, timeout=20)
|
|
payload = _extract_ng_state_payload(body)
|
|
return _parse_ng_state_movies(payload)
|
|
except Exception:
|
|
return []
|
|
|
|
def _fetch_new_titles_movies_page(self, page: int) -> List[MovieItem]:
|
|
page = max(1, int(page or 1))
|
|
url = self._new_titles_url()
|
|
if not url:
|
|
return []
|
|
if page > 1:
|
|
url = f"{url}?{urlencode({'page': str(page)})}"
|
|
try:
|
|
_, body = self._http_get_text(url, timeout=20)
|
|
payload = _extract_ng_state_payload(body)
|
|
movies, has_more, current_page = _parse_ng_state_movies_with_pagination(payload)
|
|
_log_debug_line(f"parse_ng_state_movies_page:page={page} count={len(movies)}")
|
|
if has_more is not None:
|
|
self._new_titles_has_more_by_page[page] = bool(has_more)
|
|
elif current_page is not None and int(current_page) != page:
|
|
self._new_titles_has_more_by_page[page] = False
|
|
if movies:
|
|
_log_titles(movies, context=f"new_titles_page={page}")
|
|
return movies
|
|
self._new_titles_has_more_by_page[page] = False
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
def new_titles_page(self, page: int) -> List[str]:
|
|
"""Paged variant: returns titles for `/movies/new?page=<page>`."""
|
|
if not REQUESTS_AVAILABLE:
|
|
return []
|
|
if not self._get_base_url():
|
|
return []
|
|
page = max(1, int(page or 1))
|
|
movies = self._fetch_new_titles_movies_page(page)
|
|
titles: List[str] = []
|
|
seen: set[str] = set()
|
|
for movie in movies:
|
|
if movie.title in seen:
|
|
continue
|
|
seen.add(movie.title)
|
|
self._id_by_title[movie.title] = movie.id
|
|
titles.append(movie.title)
|
|
return titles
|
|
|
|
def new_titles_has_more(self, page: int) -> bool:
|
|
"""Tells the UI whether `/movies/new` has a next page after `page`."""
|
|
page = max(1, int(page or 1))
|
|
cached = self._new_titles_has_more_by_page.get(page)
|
|
if cached is not None:
|
|
return bool(cached)
|
|
# Load page to fill cache.
|
|
_ = self._fetch_new_titles_movies_page(page)
|
|
return bool(self._new_titles_has_more_by_page.get(page, False))
|
|
|
|
def _fetch_search_movies(self, query: str) -> List[MovieItem]:
|
|
query = (query or "").strip()
|
|
if not query:
|
|
return []
|
|
|
|
# Parse ng-state from /search page HTML.
|
|
url = self._search_url(query)
|
|
if not url:
|
|
return []
|
|
try:
|
|
_, body = self._http_get_text(url, timeout=20)
|
|
payload = _extract_ng_state_payload(body)
|
|
results = _parse_ng_state_search_results(payload)
|
|
return _filter_movies_by_title(query, results)
|
|
except Exception:
|
|
return []
|
|
|
|
def _load_movies(self) -> List[MovieItem]:
|
|
return self._fetch_index_movies()
|
|
|
|
def _ensure_genre_index(self) -> None:
|
|
if self._genre_id_by_name:
|
|
return
|
|
# Prefer direct JSON API (simpler): GET /api/genres -> [{"id":..,"name":..}, ...]
|
|
api_url = self._api_genres_url()
|
|
if api_url:
|
|
try:
|
|
_, payload = self._http_get_json(api_url, timeout=20)
|
|
if isinstance(payload, list):
|
|
parsed: Dict[str, int] = {}
|
|
for item in payload:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = str(item.get("name") or "").strip()
|
|
if not name:
|
|
continue
|
|
try:
|
|
gid = int(item.get("id"))
|
|
except Exception:
|
|
continue
|
|
if gid > 0:
|
|
parsed[name] = gid
|
|
if parsed:
|
|
self._genre_id_by_name.clear()
|
|
self._genre_id_by_name.update(parsed)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback: parse ng-state from HTML /genres page.
|
|
url = self._genres_url()
|
|
if not url:
|
|
return
|
|
try:
|
|
_, body = self._http_get_text(url, timeout=20)
|
|
payload = _extract_ng_state_payload(body)
|
|
parsed = _parse_ng_state_genres(payload)
|
|
if parsed:
|
|
self._genre_id_by_name.clear()
|
|
self._genre_id_by_name.update(parsed)
|
|
except Exception:
|
|
return
|
|
|
|
async def search_titles(self, query: str, progress_callback: ProgressCallback = None) -> List[str]:
|
|
if not REQUESTS_AVAILABLE:
|
|
return []
|
|
query = (query or "").strip()
|
|
if not query:
|
|
return []
|
|
if not self._get_base_url():
|
|
return []
|
|
|
|
_emit_progress(progress_callback, "Einschalten Suche", 15)
|
|
movies = self._fetch_search_movies(query)
|
|
if not movies:
|
|
_emit_progress(progress_callback, "Fallback: Index filtern", 45)
|
|
movies = _filter_movies_by_title(query, self._load_movies())
|
|
_emit_progress(progress_callback, f"Treffer verarbeiten ({len(movies)})", 75)
|
|
titles: List[str] = []
|
|
seen: set[str] = set()
|
|
for movie in movies:
|
|
if movie.title in seen:
|
|
continue
|
|
seen.add(movie.title)
|
|
self._id_by_title[movie.title] = movie.id
|
|
titles.append(movie.title)
|
|
titles.sort(key=lambda value: value.casefold())
|
|
_emit_progress(progress_callback, f"Fertig: {len(titles)} Treffer", 95)
|
|
return titles
|
|
|
|
def genres(self) -> List[str]:
|
|
if not REQUESTS_AVAILABLE:
|
|
return []
|
|
if not self._get_base_url():
|
|
return []
|
|
self._ensure_genre_index()
|
|
return sorted(self._genre_id_by_name.keys(), key=lambda value: value.casefold())
|
|
|
|
def titles_for_genre(self, genre: str) -> List[str]:
|
|
# Backwards compatible (first page only); paging handled via titles_for_genre_page().
|
|
titles = self.titles_for_genre_page(genre, 1)
|
|
titles.sort(key=lambda value: value.casefold())
|
|
return titles
|
|
|
|
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
|
|
if not REQUESTS_AVAILABLE:
|
|
return []
|
|
genre = (genre or "").strip()
|
|
if not genre:
|
|
return []
|
|
if not self._get_base_url():
|
|
return []
|
|
self._ensure_genre_index()
|
|
genre_id = self._genre_id_by_name.get(genre)
|
|
if not genre_id:
|
|
return []
|
|
# Do NOT use `/api/movies?withGenres=...` directly: on some deployments it returns
|
|
# a mismatched/unfiltered dataset. Instead parse the server-rendered genre page
|
|
# `/genres/<id>` which embeds the correct data in ng-state.
|
|
url = self._genre_page_url(genre_id=int(genre_id), page=max(1, int(page or 1)))
|
|
if not url:
|
|
return []
|
|
try:
|
|
_, body = self._http_get_text(url, timeout=20)
|
|
payload = _extract_ng_state_payload(body)
|
|
except Exception:
|
|
return []
|
|
if not isinstance(payload, dict):
|
|
return []
|
|
|
|
movies, has_more, current_page = _parse_ng_state_movies_with_pagination(payload)
|
|
page = max(1, int(page or 1))
|
|
if has_more is not None:
|
|
self._genre_has_more_by_id_page[(int(genre_id), page)] = bool(has_more)
|
|
elif current_page is not None and int(current_page) != page:
|
|
# Defensive: if the page param wasn't honored, avoid showing "next".
|
|
self._genre_has_more_by_id_page[(int(genre_id), page)] = False
|
|
|
|
titles: List[str] = []
|
|
seen: set[str] = set()
|
|
for movie in movies:
|
|
title = (movie.title or "").strip()
|
|
if not title or title in seen:
|
|
continue
|
|
seen.add(title)
|
|
if movie.id > 0:
|
|
self._id_by_title[title] = int(movie.id)
|
|
titles.append(title)
|
|
return titles
|
|
|
|
def genre_has_more(self, genre: str, page: int) -> bool:
|
|
"""Optional: tells the UI whether a genre has more pages after `page`."""
|
|
genre = (genre or "").strip()
|
|
if not genre:
|
|
return False
|
|
self._ensure_genre_index()
|
|
genre_id = self._genre_id_by_name.get(genre)
|
|
if not genre_id:
|
|
return False
|
|
page = max(1, int(page or 1))
|
|
cached = self._genre_has_more_by_id_page.get((int(genre_id), page))
|
|
if cached is not None:
|
|
return bool(cached)
|
|
# If the page wasn't loaded yet, load it (fills the cache) and then report.
|
|
_ = self.titles_for_genre_page(genre, page)
|
|
return bool(self._genre_has_more_by_id_page.get((int(genre_id), page), False))
|
|
|
|
def seasons_for(self, title: str) -> List[str]:
|
|
# Beim Öffnen eines Titels: Detailseite anhand der ID abrufen (HTML) und cachen.
|
|
title = (title or "").strip()
|
|
if not title:
|
|
return []
|
|
movie_id = self._ensure_title_id(title)
|
|
if movie_id is not None:
|
|
self._fetch_movie_detail(movie_id)
|
|
# Playback: expose a single "Stream" folder (inside: 1 playable item = Filmtitel).
|
|
return ["Stream"]
|
|
|
|
def episodes_for(self, title: str, season: str) -> List[str]:
|
|
season = (season or "").strip()
|
|
if season.casefold() == "stream":
|
|
title = (title or "").strip()
|
|
return [title] if title else []
|
|
return []
|
|
|
|
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
|
|
title = (title or "").strip()
|
|
season = (season or "").strip()
|
|
episode = (episode or "").strip()
|
|
# Backwards compatible:
|
|
# - old: Film / Stream
|
|
# - new: Stream / <Filmtitel>
|
|
if not title:
|
|
return None
|
|
if season.casefold() == "film" and episode.casefold() == "stream":
|
|
pass
|
|
elif season.casefold() == "stream" and (episode == title or episode.casefold() == "stream"):
|
|
pass
|
|
else:
|
|
return None
|
|
movie_id = self._ensure_title_id(title)
|
|
if movie_id is None:
|
|
return None
|
|
stream_url = self._watch_stream_url(movie_id)
|
|
return stream_url or None
|
|
|
|
def resolve_stream_link(self, link: str) -> Optional[str]:
|
|
from plugin_helpers import resolve_via_resolveurl
|
|
return resolve_via_resolveurl(link, fallback_to_link=True)
|
|
|
|
def capabilities(self) -> Set[str]:
|
|
return {"new_titles", "genres", "popular_series"}
|
|
|
|
def popular_series(self) -> List[str]:
|
|
"""Liefert die am besten bewerteten Filme (nach voteAverage sortiert)."""
|
|
if not REQUESTS_AVAILABLE:
|
|
return []
|
|
if not self._get_base_url():
|
|
return []
|
|
movies = self._load_movies()
|
|
with_rating = [m for m in movies if m.vote_average is not None]
|
|
without_rating = [m for m in movies if m.vote_average is None]
|
|
ranked = sorted(with_rating, key=lambda m: (m.vote_average or 0.0), reverse=True)
|
|
ordered = ranked + without_rating
|
|
titles: List[str] = []
|
|
seen: set[str] = set()
|
|
for movie in ordered[:50]:
|
|
if movie.title in seen:
|
|
continue
|
|
seen.add(movie.title)
|
|
self._id_by_title[movie.title] = movie.id
|
|
titles.append(movie.title)
|
|
return titles
|
|
|
|
def new_titles(self) -> List[str]:
|
|
if not REQUESTS_AVAILABLE:
|
|
return []
|
|
if not self._get_base_url():
|
|
return []
|
|
# Backwards compatible: first page only. UI uses paging via `new_titles_page`.
|
|
return self.new_titles_page(1)
|
|
|
|
|
|
# Alias für die automatische Plugin-Erkennung.
|
|
Plugin = EinschaltenPlugin
|