Core & Architektur: - Neues Verzeichnis addon/core/ mit router.py, trakt.py, metadata.py, gui.py, playstate.py, plugin_manager.py, updater.py - Tests-Verzeichnis hinzugefügt (24 Tests, pytest + Coverage) Trakt-Integration: - OAuth Device Flow, Scrobbling, Watchlist, History, Calendar - Upcoming Episodes, Weiterschauen (Continue Watching) - Watched-Status in Episodenlisten - _trakt_find_in_plugins() mit 5-Min-Cache Serienstream-Suche: - API-Ergebnisse werden immer mit Katalog-Cache ergänzt (serverseitiges 10-Treffer-Limit) - Katalog-Cache wird beim Addon-Start im Daemon-Thread vorgewärmt - Notification nach Cache-Load via xbmc.executebuiltin() (thread-sicher) Bugfixes (Code-Review): - Race Condition auf _TRAKT_WATCHED_CACHE: _TRAKT_WATCHED_CACHE_LOCK hinzugefügt - GUI-Dialog aus Daemon-Thread: xbmcgui -> xbmc.executebuiltin() - ValueError in Trakt-Watchlist-Routen abgesichert - Token expires_at==0 Check korrigiert - get_setting_bool() Kontrollfluss in gui.py bereinigt - topstreamfilm_plugin: try-finally um xbmcvfs.File.close() Cleanup: - default.py.bak und refactor_router.py entfernt - .gitignore: /tests/ Eintrag entfernt - Type-Hints vereinheitlicht (Dict/List/Tuple -> dict/list/tuple)
329 lines
10 KiB
Python
329 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""Shared helpers for ViewIt plugins.
|
||
|
||
Focus:
|
||
- Kodi addon settings access (string/bool)
|
||
- Optional URL notifications
|
||
- Optional URL logging
|
||
- Optional HTML response dumps
|
||
|
||
Designed to work both in Kodi and outside Kodi (for linting/tests).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
import hashlib
|
||
import os
|
||
import re
|
||
from typing import Optional
|
||
from urllib.parse import parse_qsl, urlencode
|
||
|
||
try: # pragma: no cover - Kodi runtime
|
||
import xbmcaddon # type: ignore[import-not-found]
|
||
import xbmcvfs # type: ignore[import-not-found]
|
||
import xbmcgui # type: ignore[import-not-found]
|
||
except ImportError: # pragma: no cover - allow importing outside Kodi
|
||
xbmcaddon = None
|
||
xbmcvfs = None
|
||
xbmcgui = None
|
||
|
||
|
||
def get_setting_string(addon_id: str, setting_id: str, *, default: str = "") -> str:
|
||
if xbmcaddon is None:
|
||
return default
|
||
try:
|
||
addon = xbmcaddon.Addon(addon_id)
|
||
getter = getattr(addon, "getSettingString", None)
|
||
if getter is not None:
|
||
return str(getter(setting_id) or "").strip()
|
||
return str(addon.getSetting(setting_id) or "").strip()
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def get_setting_bool(addon_id: str, setting_id: str, *, default: bool = False) -> bool:
|
||
if xbmcaddon is None:
|
||
return default
|
||
try:
|
||
addon = xbmcaddon.Addon(addon_id)
|
||
getter = getattr(addon, "getSettingBool", None)
|
||
if getter is not None:
|
||
return bool(getter(setting_id))
|
||
raw = addon.getSetting(setting_id)
|
||
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def get_setting_int(addon_id: str, setting_id: str, *, default: int = 0) -> int:
|
||
if xbmcaddon is None:
|
||
return default
|
||
try:
|
||
addon = xbmcaddon.Addon(addon_id)
|
||
getter = getattr(addon, "getSettingInt", None)
|
||
if getter is not None:
|
||
return int(getter(setting_id))
|
||
raw = addon.getSetting(setting_id)
|
||
return int(str(raw).strip())
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def _is_logging_enabled(addon_id: str, *, global_setting_id: str, plugin_setting_id: Optional[str]) -> bool:
|
||
if not get_setting_bool(addon_id, global_setting_id, default=False):
|
||
return False
|
||
if plugin_setting_id:
|
||
return get_setting_bool(addon_id, plugin_setting_id, default=False)
|
||
return True
|
||
|
||
|
||
def notify_url(
|
||
addon_id: str,
|
||
*,
|
||
heading: str,
|
||
url: str,
|
||
enabled_setting_id: str,
|
||
plugin_setting_id: Optional[str] = None,
|
||
) -> None:
|
||
if xbmcgui is None:
|
||
return
|
||
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
|
||
return
|
||
try:
|
||
xbmcgui.Dialog().notification(heading, url, xbmcgui.NOTIFICATION_INFO, 3000)
|
||
except Exception:
|
||
return
|
||
|
||
|
||
def show_notification(
|
||
heading: str,
|
||
message: str,
|
||
*,
|
||
icon: int | None = None,
|
||
milliseconds: int = 3000,
|
||
) -> None:
|
||
"""Zeigt eine kurze Kodi-Notification an (falls `xbmcgui` verfuegbar ist)."""
|
||
|
||
if xbmcgui is None:
|
||
return
|
||
try:
|
||
icon_value = icon if icon is not None else xbmcgui.NOTIFICATION_INFO
|
||
xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), icon_value, int(milliseconds))
|
||
except Exception:
|
||
return
|
||
|
||
|
||
def show_error(heading: str, message: str, *, milliseconds: int = 4000) -> None:
|
||
"""Zeigt eine einheitliche Fehlermeldung im Kodi-UI."""
|
||
|
||
if xbmcgui is None:
|
||
return
|
||
try:
|
||
xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), xbmcgui.NOTIFICATION_ERROR, int(milliseconds))
|
||
except Exception:
|
||
return
|
||
|
||
|
||
def _profile_logs_dir(addon_id: str) -> Optional[str]:
|
||
if xbmcaddon is None or xbmcvfs is None:
|
||
return None
|
||
try:
|
||
addon = xbmcaddon.Addon(addon_id)
|
||
profile = xbmcvfs.translatePath(addon.getAddonInfo("profile"))
|
||
log_dir = os.path.join(profile, "logs")
|
||
if not xbmcvfs.exists(log_dir):
|
||
xbmcvfs.mkdirs(log_dir)
|
||
return log_dir
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _append_text_file(path: str, content: str) -> None:
|
||
try:
|
||
with open(path, "a", encoding="utf-8") as handle:
|
||
handle.write(content)
|
||
return
|
||
except Exception:
|
||
pass
|
||
if xbmcvfs is None:
|
||
return
|
||
try:
|
||
handle = xbmcvfs.File(path, "a")
|
||
handle.write(content)
|
||
handle.close()
|
||
except Exception:
|
||
return
|
||
|
||
|
||
def _rotate_log_file(path: str, *, max_bytes: int, max_files: int) -> None:
|
||
if max_bytes <= 0 or max_files <= 0:
|
||
return
|
||
try:
|
||
if not os.path.exists(path) or os.path.getsize(path) <= max_bytes:
|
||
return
|
||
except Exception:
|
||
return
|
||
try:
|
||
for index in range(max_files - 1, 0, -1):
|
||
older = f"{path}.{index}"
|
||
newer = f"{path}.{index + 1}"
|
||
if os.path.exists(older):
|
||
if index + 1 > max_files:
|
||
os.remove(older)
|
||
else:
|
||
os.replace(older, newer)
|
||
os.replace(path, f"{path}.1")
|
||
except Exception:
|
||
return
|
||
|
||
|
||
def _prune_dump_files(directory: str, *, prefix: str, max_files: int) -> None:
|
||
if not directory or max_files <= 0:
|
||
return
|
||
try:
|
||
entries = [
|
||
os.path.join(directory, name)
|
||
for name in os.listdir(directory)
|
||
if name.startswith(prefix) and name.endswith(".html")
|
||
]
|
||
if len(entries) <= max_files:
|
||
return
|
||
entries.sort(key=lambda path: os.path.getmtime(path))
|
||
for path in entries[: len(entries) - max_files]:
|
||
try:
|
||
os.remove(path)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
return
|
||
|
||
|
||
def log_url(
|
||
addon_id: str,
|
||
*,
|
||
enabled_setting_id: str,
|
||
log_filename: str,
|
||
url: str,
|
||
kind: str = "VISIT",
|
||
request_id: Optional[str] = None,
|
||
plugin_setting_id: Optional[str] = None,
|
||
max_mb_setting_id: str = "log_max_mb",
|
||
max_files_setting_id: str = "log_max_files",
|
||
) -> None:
|
||
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
|
||
return
|
||
timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
||
request_part = f"\t{request_id}" if request_id else ""
|
||
line = f"{timestamp}\t{kind}{request_part}\t{url}\n"
|
||
log_dir = _profile_logs_dir(addon_id)
|
||
path = os.path.join(log_dir, log_filename) if log_dir else os.path.join(os.path.dirname(__file__), log_filename)
|
||
max_mb = get_setting_int(addon_id, max_mb_setting_id, default=5)
|
||
max_files = get_setting_int(addon_id, max_files_setting_id, default=3)
|
||
_rotate_log_file(path, max_bytes=max_mb * 1024 * 1024, max_files=max_files)
|
||
_append_text_file(path, line)
|
||
|
||
|
||
def log_error(
|
||
addon_id: str,
|
||
*,
|
||
enabled_setting_id: str,
|
||
log_filename: str,
|
||
message: str,
|
||
request_id: Optional[str] = None,
|
||
plugin_setting_id: Optional[str] = None,
|
||
) -> None:
|
||
log_url(
|
||
addon_id,
|
||
enabled_setting_id=enabled_setting_id,
|
||
plugin_setting_id=plugin_setting_id,
|
||
log_filename=log_filename,
|
||
url=message,
|
||
kind="ERROR",
|
||
request_id=request_id,
|
||
)
|
||
|
||
|
||
def dump_response_html(
|
||
addon_id: str,
|
||
*,
|
||
enabled_setting_id: str,
|
||
url: str,
|
||
body: str,
|
||
filename_prefix: str,
|
||
request_id: Optional[str] = None,
|
||
plugin_setting_id: Optional[str] = None,
|
||
max_files_setting_id: str = "dump_max_files",
|
||
) -> None:
|
||
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
|
||
return
|
||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
|
||
digest = hashlib.md5(url.encode("utf-8")).hexdigest() # nosec - filename only
|
||
filename = f"{filename_prefix}_{timestamp}_{digest}.html"
|
||
log_dir = _profile_logs_dir(addon_id)
|
||
path = os.path.join(log_dir, filename) if log_dir else os.path.join(os.path.dirname(__file__), filename)
|
||
request_line = f" request_id={request_id}" if request_id else ""
|
||
content = f"<!-- {url}{request_line} -->\n{body or ''}"
|
||
if log_dir:
|
||
max_files = get_setting_int(addon_id, max_files_setting_id, default=200)
|
||
_prune_dump_files(log_dir, prefix=filename_prefix, max_files=max_files)
|
||
_append_text_file(path, content)
|
||
|
||
|
||
def resolve_via_resolveurl(link: str, *, fallback_to_link: bool = True) -> Optional[str]:
|
||
"""Versucht einen Hoster-Link mit resolveurl_backend aufzuloesen.
|
||
|
||
Gibt den aufgeloesten Link zurueck, oder – wenn resolveurl nicht verfuegbar
|
||
ist oder nichts liefert – den Original-Link (wenn fallback_to_link=True)
|
||
bzw. None (wenn fallback_to_link=False).
|
||
"""
|
||
link = (link or "").strip()
|
||
if not link:
|
||
return None
|
||
try:
|
||
from resolveurl_backend import resolve as _resolve_fn # type: ignore[import-not-found]
|
||
except Exception:
|
||
_resolve_fn = None
|
||
if callable(_resolve_fn):
|
||
resolved = _resolve_fn(link)
|
||
if resolved:
|
||
return resolved
|
||
return link if fallback_to_link else None
|
||
|
||
|
||
def normalize_resolved_stream_url(final_url: str, *, source_url: str = "") -> str:
|
||
"""Normalisiert hoster-spezifische Header im finalen Stream-Link.
|
||
|
||
`final_url` kann ein Kodi-Header-Suffix enthalten: `url|Key=Value&...`.
|
||
Die Funktion passt nur bekannte Problemfaelle an und laesst sonst alles unveraendert.
|
||
"""
|
||
|
||
url = (final_url or "").strip()
|
||
if not url:
|
||
return ""
|
||
normalized = _normalize_supervideo_serversicuro(url, source_url=source_url)
|
||
return normalized
|
||
|
||
|
||
def _normalize_supervideo_serversicuro(final_url: str, *, source_url: str = "") -> str:
|
||
if "serversicuro.cc/hls/" not in final_url.casefold() or "|" not in final_url:
|
||
return final_url
|
||
|
||
source = (source_url or "").strip()
|
||
code_match = re.search(
|
||
r"supervideo\.(?:tv|cc)/(?:e/)?([a-z0-9]+)(?:\\.html)?",
|
||
source,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
if not code_match:
|
||
return final_url
|
||
|
||
code = (code_match.group(1) or "").strip()
|
||
if not code:
|
||
return final_url
|
||
|
||
media_url, header_suffix = final_url.split("|", 1)
|
||
headers = dict(parse_qsl(header_suffix, keep_blank_values=True))
|
||
headers["Referer"] = f"https://supervideo.cc/e/{code}"
|
||
return f"{media_url}|{urlencode(headers)}"
|