Files
ViewIT/addon/plugin_helpers.py
itdrui.de 7b60b00c8b dev: umfangreiches Refactoring, Trakt-Integration und Code-Review-Fixes (0.1.69-dev)
Core & Architektur:
- Neues Verzeichnis addon/core/ mit router.py, trakt.py, metadata.py,
  gui.py, playstate.py, plugin_manager.py, updater.py
- Tests-Verzeichnis hinzugefügt (24 Tests, pytest + Coverage)

Trakt-Integration:
- OAuth Device Flow, Scrobbling, Watchlist, History, Calendar
- Upcoming Episodes, Weiterschauen (Continue Watching)
- Watched-Status in Episodenlisten
- _trakt_find_in_plugins() mit 5-Min-Cache

Serienstream-Suche:
- API-Ergebnisse werden immer mit Katalog-Cache ergänzt (serverseitiges 10-Treffer-Limit)
- Katalog-Cache wird beim Addon-Start im Daemon-Thread vorgewärmt
- Notification nach Cache-Load via xbmc.executebuiltin() (thread-sicher)

Bugfixes (Code-Review):
- Race Condition auf _TRAKT_WATCHED_CACHE: _TRAKT_WATCHED_CACHE_LOCK hinzugefügt
- GUI-Dialog aus Daemon-Thread: xbmcgui -> xbmc.executebuiltin()
- ValueError in Trakt-Watchlist-Routen abgesichert
- Token expires_at==0 Check korrigiert
- get_setting_bool() Kontrollfluss in gui.py bereinigt
- topstreamfilm_plugin: try-finally um xbmcvfs.File.close()

Cleanup:
- default.py.bak und refactor_router.py entfernt
- .gitignore: /tests/ Eintrag entfernt
- Type-Hints vereinheitlicht (Dict/List/Tuple -> dict/list/tuple)
2026-03-01 18:39:05 +01:00

329 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Shared helpers for ViewIt plugins.
Focus:
- Kodi addon settings access (string/bool)
- Optional URL notifications
- Optional URL logging
- Optional HTML response dumps
Designed to work both in Kodi and outside Kodi (for linting/tests).
"""
from __future__ import annotations
from datetime import datetime
import hashlib
import os
import re
from typing import Optional
from urllib.parse import parse_qsl, urlencode
try: # pragma: no cover - Kodi runtime
import xbmcaddon # type: ignore[import-not-found]
import xbmcvfs # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow importing outside Kodi
xbmcaddon = None
xbmcvfs = None
xbmcgui = None
def get_setting_string(addon_id: str, setting_id: str, *, default: str = "") -> str:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(addon_id)
getter = getattr(addon, "getSettingString", None)
if getter is not None:
return str(getter(setting_id) or "").strip()
return str(addon.getSetting(setting_id) or "").strip()
except Exception:
return default
def get_setting_bool(addon_id: str, setting_id: str, *, default: bool = False) -> bool:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(addon_id)
getter = getattr(addon, "getSettingBool", None)
if getter is not None:
return bool(getter(setting_id))
raw = addon.getSetting(setting_id)
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
except Exception:
return default
def get_setting_int(addon_id: str, setting_id: str, *, default: int = 0) -> int:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(addon_id)
getter = getattr(addon, "getSettingInt", None)
if getter is not None:
return int(getter(setting_id))
raw = addon.getSetting(setting_id)
return int(str(raw).strip())
except Exception:
return default
def _is_logging_enabled(addon_id: str, *, global_setting_id: str, plugin_setting_id: Optional[str]) -> bool:
if not get_setting_bool(addon_id, global_setting_id, default=False):
return False
if plugin_setting_id:
return get_setting_bool(addon_id, plugin_setting_id, default=False)
return True
def notify_url(
addon_id: str,
*,
heading: str,
url: str,
enabled_setting_id: str,
plugin_setting_id: Optional[str] = None,
) -> None:
if xbmcgui is None:
return
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
return
try:
xbmcgui.Dialog().notification(heading, url, xbmcgui.NOTIFICATION_INFO, 3000)
except Exception:
return
def show_notification(
heading: str,
message: str,
*,
icon: int | None = None,
milliseconds: int = 3000,
) -> None:
"""Zeigt eine kurze Kodi-Notification an (falls `xbmcgui` verfuegbar ist)."""
if xbmcgui is None:
return
try:
icon_value = icon if icon is not None else xbmcgui.NOTIFICATION_INFO
xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), icon_value, int(milliseconds))
except Exception:
return
def show_error(heading: str, message: str, *, milliseconds: int = 4000) -> None:
"""Zeigt eine einheitliche Fehlermeldung im Kodi-UI."""
if xbmcgui is None:
return
try:
xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), xbmcgui.NOTIFICATION_ERROR, int(milliseconds))
except Exception:
return
def _profile_logs_dir(addon_id: str) -> Optional[str]:
if xbmcaddon is None or xbmcvfs is None:
return None
try:
addon = xbmcaddon.Addon(addon_id)
profile = xbmcvfs.translatePath(addon.getAddonInfo("profile"))
log_dir = os.path.join(profile, "logs")
if not xbmcvfs.exists(log_dir):
xbmcvfs.mkdirs(log_dir)
return log_dir
except Exception:
return None
def _append_text_file(path: str, content: str) -> None:
try:
with open(path, "a", encoding="utf-8") as handle:
handle.write(content)
return
except Exception:
pass
if xbmcvfs is None:
return
try:
handle = xbmcvfs.File(path, "a")
handle.write(content)
handle.close()
except Exception:
return
def _rotate_log_file(path: str, *, max_bytes: int, max_files: int) -> None:
if max_bytes <= 0 or max_files <= 0:
return
try:
if not os.path.exists(path) or os.path.getsize(path) <= max_bytes:
return
except Exception:
return
try:
for index in range(max_files - 1, 0, -1):
older = f"{path}.{index}"
newer = f"{path}.{index + 1}"
if os.path.exists(older):
if index + 1 > max_files:
os.remove(older)
else:
os.replace(older, newer)
os.replace(path, f"{path}.1")
except Exception:
return
def _prune_dump_files(directory: str, *, prefix: str, max_files: int) -> None:
if not directory or max_files <= 0:
return
try:
entries = [
os.path.join(directory, name)
for name in os.listdir(directory)
if name.startswith(prefix) and name.endswith(".html")
]
if len(entries) <= max_files:
return
entries.sort(key=lambda path: os.path.getmtime(path))
for path in entries[: len(entries) - max_files]:
try:
os.remove(path)
except Exception:
pass
except Exception:
return
def log_url(
addon_id: str,
*,
enabled_setting_id: str,
log_filename: str,
url: str,
kind: str = "VISIT",
request_id: Optional[str] = None,
plugin_setting_id: Optional[str] = None,
max_mb_setting_id: str = "log_max_mb",
max_files_setting_id: str = "log_max_files",
) -> None:
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
return
timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z"
request_part = f"\t{request_id}" if request_id else ""
line = f"{timestamp}\t{kind}{request_part}\t{url}\n"
log_dir = _profile_logs_dir(addon_id)
path = os.path.join(log_dir, log_filename) if log_dir else os.path.join(os.path.dirname(__file__), log_filename)
max_mb = get_setting_int(addon_id, max_mb_setting_id, default=5)
max_files = get_setting_int(addon_id, max_files_setting_id, default=3)
_rotate_log_file(path, max_bytes=max_mb * 1024 * 1024, max_files=max_files)
_append_text_file(path, line)
def log_error(
addon_id: str,
*,
enabled_setting_id: str,
log_filename: str,
message: str,
request_id: Optional[str] = None,
plugin_setting_id: Optional[str] = None,
) -> None:
log_url(
addon_id,
enabled_setting_id=enabled_setting_id,
plugin_setting_id=plugin_setting_id,
log_filename=log_filename,
url=message,
kind="ERROR",
request_id=request_id,
)
def dump_response_html(
addon_id: str,
*,
enabled_setting_id: str,
url: str,
body: str,
filename_prefix: str,
request_id: Optional[str] = None,
plugin_setting_id: Optional[str] = None,
max_files_setting_id: str = "dump_max_files",
) -> None:
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
return
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
digest = hashlib.md5(url.encode("utf-8")).hexdigest() # nosec - filename only
filename = f"{filename_prefix}_{timestamp}_{digest}.html"
log_dir = _profile_logs_dir(addon_id)
path = os.path.join(log_dir, filename) if log_dir else os.path.join(os.path.dirname(__file__), filename)
request_line = f" request_id={request_id}" if request_id else ""
content = f"<!-- {url}{request_line} -->\n{body or ''}"
if log_dir:
max_files = get_setting_int(addon_id, max_files_setting_id, default=200)
_prune_dump_files(log_dir, prefix=filename_prefix, max_files=max_files)
_append_text_file(path, content)
def resolve_via_resolveurl(link: str, *, fallback_to_link: bool = True) -> Optional[str]:
"""Versucht einen Hoster-Link mit resolveurl_backend aufzuloesen.
Gibt den aufgeloesten Link zurueck, oder wenn resolveurl nicht verfuegbar
ist oder nichts liefert den Original-Link (wenn fallback_to_link=True)
bzw. None (wenn fallback_to_link=False).
"""
link = (link or "").strip()
if not link:
return None
try:
from resolveurl_backend import resolve as _resolve_fn # type: ignore[import-not-found]
except Exception:
_resolve_fn = None
if callable(_resolve_fn):
resolved = _resolve_fn(link)
if resolved:
return resolved
return link if fallback_to_link else None
def normalize_resolved_stream_url(final_url: str, *, source_url: str = "") -> str:
"""Normalisiert hoster-spezifische Header im finalen Stream-Link.
`final_url` kann ein Kodi-Header-Suffix enthalten: `url|Key=Value&...`.
Die Funktion passt nur bekannte Problemfaelle an und laesst sonst alles unveraendert.
"""
url = (final_url or "").strip()
if not url:
return ""
normalized = _normalize_supervideo_serversicuro(url, source_url=source_url)
return normalized
def _normalize_supervideo_serversicuro(final_url: str, *, source_url: str = "") -> str:
if "serversicuro.cc/hls/" not in final_url.casefold() or "|" not in final_url:
return final_url
source = (source_url or "").strip()
code_match = re.search(
r"supervideo\.(?:tv|cc)/(?:e/)?([a-z0-9]+)(?:\\.html)?",
source,
flags=re.IGNORECASE,
)
if not code_match:
return final_url
code = (code_match.group(1) or "").strip()
if not code:
return final_url
media_url, header_suffix = final_url.split("|", 1)
headers = dict(parse_qsl(header_suffix, keep_blank_values=True))
headers["Referer"] = f"https://supervideo.cc/e/{code}"
return f"{media_url}|{urlencode(headers)}"