Core & Architektur: - Neues Verzeichnis addon/core/ mit router.py, trakt.py, metadata.py, gui.py, playstate.py, plugin_manager.py, updater.py - Tests-Verzeichnis hinzugefügt (24 Tests, pytest + Coverage) Trakt-Integration: - OAuth Device Flow, Scrobbling, Watchlist, History, Calendar - Upcoming Episodes, Weiterschauen (Continue Watching) - Watched-Status in Episodenlisten - _trakt_find_in_plugins() mit 5-Min-Cache Serienstream-Suche: - API-Ergebnisse werden immer mit Katalog-Cache ergänzt (serverseitiges 10-Treffer-Limit) - Katalog-Cache wird beim Addon-Start im Daemon-Thread vorgewärmt - Notification nach Cache-Load via xbmc.executebuiltin() (thread-sicher) Bugfixes (Code-Review): - Race Condition auf _TRAKT_WATCHED_CACHE: _TRAKT_WATCHED_CACHE_LOCK hinzugefügt - GUI-Dialog aus Daemon-Thread: xbmcgui -> xbmc.executebuiltin() - ValueError in Trakt-Watchlist-Routen abgesichert - Token expires_at==0 Check korrigiert - get_setting_bool() Kontrollfluss in gui.py bereinigt - topstreamfilm_plugin: try-finally um xbmcvfs.File.close() Cleanup: - default.py.bak und refactor_router.py entfernt - .gitignore: /tests/ Eintrag entfernt - Type-Hints vereinheitlicht (Dict/List/Tuple -> dict/list/tuple)
342 lines
10 KiB
Python
342 lines
10 KiB
Python
from __future__ import annotations
|
||
import sys
|
||
import re
|
||
import contextlib
|
||
from urllib.parse import urlencode
|
||
from typing import Any, Generator, Optional, Callable
|
||
from contextlib import contextmanager
|
||
|
||
try:
|
||
import xbmc
|
||
import xbmcaddon
|
||
import xbmcgui
|
||
import xbmcplugin
|
||
except ImportError:
|
||
xbmc = None
|
||
xbmcaddon = None
|
||
xbmcgui = None
|
||
xbmcplugin = None
|
||
|
||
_ADDON_INSTANCE = None
|
||
|
||
def get_addon():
|
||
global _ADDON_INSTANCE
|
||
if xbmcaddon is None:
|
||
return None
|
||
if _ADDON_INSTANCE is None:
|
||
_ADDON_INSTANCE = xbmcaddon.Addon()
|
||
return _ADDON_INSTANCE
|
||
|
||
def get_handle() -> int:
|
||
return int(sys.argv[1]) if len(sys.argv) > 1 else -1
|
||
|
||
def get_setting_string(setting_id: str) -> str:
|
||
addon = get_addon()
|
||
if addon is None:
|
||
return ""
|
||
getter = getattr(addon, "getSettingString", None)
|
||
if callable(getter):
|
||
try:
|
||
return str(getter(setting_id) or "")
|
||
except Exception:
|
||
pass
|
||
getter = getattr(addon, "getSetting", None)
|
||
if callable(getter):
|
||
try:
|
||
return str(getter(setting_id) or "")
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
def get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
|
||
addon = get_addon()
|
||
if addon is None:
|
||
return default
|
||
# Schritt 1: Prüfe ob das Setting überhaupt gesetzt ist (leerer Rohwert = default)
|
||
raw_getter = getattr(addon, "getSetting", None)
|
||
if callable(raw_getter):
|
||
try:
|
||
raw = str(raw_getter(setting_id) or "").strip()
|
||
if not raw:
|
||
return default
|
||
except Exception:
|
||
return default
|
||
# Schritt 2: Bevorzuge getSettingBool für korrekte Typ-Konvertierung
|
||
getter = getattr(addon, "getSettingBool", None)
|
||
if callable(getter):
|
||
try:
|
||
return bool(getter(setting_id))
|
||
except Exception:
|
||
pass
|
||
# Schritt 3: Fallback – Rohwert manuell parsen
|
||
if callable(raw_getter):
|
||
try:
|
||
raw = str(raw_getter(setting_id) or "").strip().lower()
|
||
return raw == "true"
|
||
except Exception:
|
||
pass
|
||
return default
|
||
|
||
def get_setting_int(setting_id: str, *, default: int = 0) -> int:
|
||
addon = get_addon()
|
||
if addon is None:
|
||
return default
|
||
getter = getattr(addon, "getSettingInt", None)
|
||
if callable(getter):
|
||
try:
|
||
raw_getter = getattr(addon, "getSetting", None)
|
||
if callable(raw_getter):
|
||
raw = str(raw_getter(setting_id) or "").strip()
|
||
if not raw:
|
||
return default
|
||
return int(getter(setting_id))
|
||
except Exception:
|
||
pass
|
||
getter = getattr(addon, "getSetting", None)
|
||
if callable(getter):
|
||
try:
|
||
raw = str(getter(setting_id) or "").strip()
|
||
return int(raw) if raw else default
|
||
except Exception:
|
||
pass
|
||
return default
|
||
|
||
def set_setting_string(setting_id: str, value: str) -> None:
|
||
addon = get_addon()
|
||
if addon is None:
|
||
return
|
||
setter = getattr(addon, "setSettingString", None)
|
||
if callable(setter):
|
||
try:
|
||
setter(setting_id, str(value))
|
||
return
|
||
except Exception:
|
||
pass
|
||
setter = getattr(addon, "setSetting", None)
|
||
if callable(setter):
|
||
try:
|
||
setter(setting_id, str(value))
|
||
except Exception:
|
||
pass
|
||
|
||
@contextmanager
|
||
def progress_dialog(heading: str, message: str = ""):
|
||
"""Zeigt einen Fortschrittsdialog in Kodi und liefert eine Update-Funktion."""
|
||
dialog = None
|
||
try:
|
||
if xbmcgui is not None and hasattr(xbmcgui, "DialogProgress"):
|
||
dialog = xbmcgui.DialogProgress()
|
||
dialog.create(heading, message)
|
||
except Exception:
|
||
dialog = None
|
||
|
||
def _update_fn(percent: int, msg: str = "") -> bool:
|
||
if dialog:
|
||
try:
|
||
dialog.update(percent, msg or message)
|
||
return dialog.iscanceled()
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
try:
|
||
yield _update_fn
|
||
finally:
|
||
if dialog:
|
||
try:
|
||
dialog.close()
|
||
except Exception:
|
||
pass
|
||
|
||
@contextmanager
|
||
def busy_dialog(message: str = "Bitte warten...", *, heading: str = "Bitte warten"):
|
||
"""Progress-Dialog statt Spinner, mit kurzem Status-Text."""
|
||
with progress_dialog(heading, message) as progress:
|
||
progress(10, message)
|
||
def _update(step_message: str, percent: int | None = None) -> bool:
|
||
pct = 50 if percent is None else max(5, min(95, int(percent)))
|
||
return progress(pct, step_message or message)
|
||
try:
|
||
yield _update
|
||
finally:
|
||
progress(100, "Fertig")
|
||
|
||
def run_with_progress(heading: str, message: str, loader: Callable[[], Any]) -> Any:
|
||
"""Fuehrt eine Ladefunktion mit sichtbarem Fortschrittsdialog aus."""
|
||
with progress_dialog(heading, message) as progress:
|
||
progress(10, message)
|
||
result = loader()
|
||
progress(100, "Fertig")
|
||
return result
|
||
|
||
def set_content(handle: int, content: str) -> None:
|
||
"""Hint Kodi about the content type so skins can show watched/resume overlays."""
|
||
content = (content or "").strip()
|
||
if not content:
|
||
return
|
||
try:
|
||
setter = getattr(xbmcplugin, "setContent", None)
|
||
if callable(setter):
|
||
setter(handle, content)
|
||
except Exception:
|
||
pass
|
||
|
||
def add_directory_item(
|
||
handle: int,
|
||
label: str,
|
||
action: str,
|
||
params: dict[str, str] | None = None,
|
||
*,
|
||
is_folder: bool = True,
|
||
info_labels: dict[str, Any] | None = None,
|
||
art: dict[str, str] | None = None,
|
||
cast: Any = None,
|
||
base_url: str = "",
|
||
) -> None:
|
||
"""Fuegt einen Eintrag in die Kodi-Liste ein."""
|
||
query: dict[str, str] = {"action": action}
|
||
if params:
|
||
query.update(params)
|
||
url = f"{base_url}?{urlencode(query)}"
|
||
item = xbmcgui.ListItem(label=label)
|
||
if not is_folder:
|
||
try:
|
||
item.setProperty("IsPlayable", "true")
|
||
except Exception:
|
||
pass
|
||
apply_video_info(item, info_labels, cast)
|
||
if art:
|
||
setter = getattr(item, "setArt", None)
|
||
if callable(setter):
|
||
try:
|
||
setter(art)
|
||
except Exception:
|
||
pass
|
||
xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder)
|
||
|
||
def apply_video_info(item, info_labels: dict[str, Any] | None, cast: Any = None) -> None:
|
||
"""Setzt Metadaten via InfoTagVideo (Kodi v20+), mit Fallback."""
|
||
if not info_labels and not cast:
|
||
return
|
||
info_labels = dict(info_labels or {})
|
||
get_tag = getattr(item, "getVideoInfoTag", None)
|
||
tag = None
|
||
if callable(get_tag):
|
||
try:
|
||
tag = get_tag()
|
||
except Exception:
|
||
tag = None
|
||
|
||
if tag is not None:
|
||
try:
|
||
_apply_tag_info(tag, info_labels)
|
||
if cast:
|
||
_apply_tag_cast(tag, cast)
|
||
except Exception:
|
||
pass
|
||
else:
|
||
# Fallback für ältere Kodi-Versionen
|
||
setter = getattr(item, "setInfo", None)
|
||
if callable(setter):
|
||
try:
|
||
setter("video", info_labels)
|
||
except Exception:
|
||
pass
|
||
if cast:
|
||
setter = getattr(item, "setCast", None)
|
||
if callable(setter):
|
||
try:
|
||
setter(cast)
|
||
except Exception:
|
||
pass
|
||
|
||
def _apply_tag_info(tag, info: dict[str, Any]) -> None:
|
||
for key, method in [
|
||
("title", "setTitle"),
|
||
("plot", "setPlot"),
|
||
("mediatype", "setMediaType"),
|
||
("tvshowtitle", "setTvShowTitle"),
|
||
]:
|
||
val = info.get(key)
|
||
if val:
|
||
setter = getattr(tag, method, None)
|
||
if callable(setter): setter(str(val))
|
||
|
||
for key, method in [("season", "setSeason"), ("episode", "setEpisode")]:
|
||
val = info.get(key)
|
||
if val not in (None, "", 0, "0"):
|
||
setter = getattr(tag, method, None)
|
||
if callable(setter): setter(int(val))
|
||
|
||
rating = info.get("rating")
|
||
if rating not in (None, "", 0, "0"):
|
||
set_rating = getattr(tag, "setRating", None)
|
||
if callable(set_rating):
|
||
try: set_rating(float(rating))
|
||
except Exception: pass
|
||
|
||
def _apply_tag_cast(tag, cast) -> None:
|
||
setter = getattr(tag, "setCast", None)
|
||
if not callable(setter):
|
||
return
|
||
try:
|
||
formatted_cast = []
|
||
for c in cast:
|
||
# Erwarte TmdbCastMember oder ähnliches Objekt/Dict
|
||
name = getattr(c, "name", "") or c.get("name", "") if hasattr(c, "get") else ""
|
||
role = getattr(c, "role", "") or c.get("role", "") if hasattr(c, "get") else ""
|
||
thumb = getattr(c, "thumbnail", "") or c.get("thumbnail", "") if hasattr(c, "get") else ""
|
||
if name:
|
||
formatted_cast.append(xbmcgui.Actor(name=name, role=role, thumbnail=thumb))
|
||
if formatted_cast:
|
||
setter(formatted_cast)
|
||
except Exception:
|
||
pass
|
||
|
||
def label_with_duration(label: str, info_labels: dict[str, Any]) -> str:
|
||
duration = info_labels.get("duration")
|
||
if not duration:
|
||
return label
|
||
try:
|
||
minutes = int(duration) // 60
|
||
if minutes > 0:
|
||
return f"{label} ({minutes} Min.)"
|
||
except Exception:
|
||
pass
|
||
return label
|
||
|
||
|
||
def extract_first_int(value: str | int | None) -> Optional[int]:
|
||
if value is None:
|
||
return None
|
||
if isinstance(value, int):
|
||
return value
|
||
match = re.search(r"\d+", str(value))
|
||
return int(match.group()) if match else None
|
||
|
||
|
||
def looks_like_unresolved_hoster_link(url: str) -> bool:
|
||
url = (url or "").strip()
|
||
return any(p in url.casefold() for p in ["hoster", "link", "resolve"])
|
||
|
||
|
||
def is_resolveurl_missing_error(err: str | None) -> bool:
|
||
err = str(err or "").strip().lower()
|
||
return "resolveurl" in err and ("missing" in err or "not found" in err)
|
||
|
||
|
||
def is_cloudflare_challenge_error(err: str | None) -> bool:
|
||
err = str(err or "").strip().lower()
|
||
return "cloudflare" in err or "challenge" in err
|
||
|
||
|
||
def resolveurl_last_error() -> str:
|
||
try:
|
||
from resolveurl_backend import get_last_error # type: ignore
|
||
except Exception:
|
||
return ""
|
||
try:
|
||
return str(get_last_error() or "")
|
||
except Exception:
|
||
return ""
|