Compare commits

...

13 Commits

32 changed files with 2423 additions and 80 deletions

BIN
.coverage

Binary file not shown.

9
.gitignore vendored
View File

@@ -9,5 +9,14 @@
# Local tests (not committed)
/tests/
/TESTING/
/.pytest_cache/
/pytest.ini
# Python artifacts
__pycache__/
*.pyc
.coverage
# Plugin runtime caches
/addon/plugins/*_cache.json

View File

@@ -14,6 +14,19 @@ ViewIT ist ein KodiAddon zum Durchsuchen und Abspielen von Inhalten der unter
- AddonOrdner bauen: `./scripts/build_install_addon.sh``dist/<addon_id>/`
- KodiZIP bauen: `./scripts/build_kodi_zip.sh``dist/<addon_id>-<version>.zip`
- AddonVersion in `addon/addon.xml`
- Reproduzierbare ZIPs: optional `SOURCE_DATE_EPOCH` setzen
## Lokales Kodi-Repository
- Repository bauen (inkl. ZIPs + `addons.xml` + `addons.xml.md5`): `./scripts/build_local_kodi_repo.sh`
- Lokal bereitstellen: `./scripts/serve_local_kodi_repo.sh`
- Standard-URL: `http://127.0.0.1:8080/repo/addons.xml`
- Optional eigene URL beim Build setzen: `REPO_BASE_URL=http://<host>:<port>/repo ./scripts/build_local_kodi_repo.sh`
## Gitea Release-Asset Upload
- ZIP bauen: `./scripts/build_kodi_zip.sh`
- Token setzen: `export GITEA_TOKEN=<token>`
- Asset an Tag hochladen (erstellt Release bei Bedarf): `./scripts/publish_gitea_release.sh`
- Optional: `--tag v0.1.50 --asset dist/plugin.video.viewit-0.1.50.zip`
## Entwicklung (kurz)
- Hauptlogik: `addon/default.py`

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.50" provider-name="ViewIt">
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.52" provider-name="ViewIt">
<requires>
<import addon="xbmc.python" version="3.0.0" />
<import addon="script.module.requests" />

View File

@@ -16,6 +16,7 @@ import json
import os
import re
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
from types import ModuleType
from urllib.parse import parse_qs, urlencode
@@ -401,6 +402,27 @@ def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return default
def _set_setting_string(setting_id: str, value: str) -> None:
if xbmcaddon is None:
return
addon = _get_addon()
if addon is None:
return
setter = getattr(addon, "setSettingString", None)
if callable(setter):
try:
setter(setting_id, str(value))
return
except TypeError:
return
setter = getattr(addon, "setSetting", None)
if callable(setter):
try:
setter(setting_id, str(value))
except TypeError:
return
def _apply_video_info(item, info_labels: dict[str, object] | None, cast: list[TmdbCastMember] | None) -> None:
"""Setzt Metadaten bevorzugt via InfoTagVideo (Kodi v20+), mit Fallback auf deprecated APIs."""
@@ -883,6 +905,136 @@ def _add_directory_item(
xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder)
def _plugin_version(plugin: BasisPlugin) -> str:
raw = getattr(plugin, "version", "0.0.0")
text = str(raw or "").strip()
return text or "0.0.0"
def _normalize_update_info_url(raw: str) -> str:
value = str(raw or "").strip()
default = "http://127.0.0.1:8080/repo/addons.xml"
if not value:
return default
if value.endswith("/addons.xml"):
return value
return value.rstrip("/") + "/addons.xml"
def _repo_addon_xml_path() -> str:
if xbmcvfs is None:
return ""
try:
return xbmcvfs.translatePath("special://home/addons/repository.viewit/addon.xml")
except Exception:
return ""
def _update_repository_source(info_url: str) -> bool:
path = _repo_addon_xml_path()
if not path:
return False
if not os.path.exists(path):
return False
try:
tree = ET.parse(path)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
return False
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
return False
base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/")
info.text = info_url
checksum.text = f"{base}/addons.xml.md5"
datadir.text = f"{base}/"
tree.write(path, encoding="utf-8", xml_declaration=True)
return True
except Exception as exc:
_log(f"Repository-URL konnte nicht gesetzt werden: {exc}", xbmc.LOGWARNING)
return False
def _settings_key_for_plugin(name: str) -> str:
safe = re.sub(r"[^a-z0-9]+", "_", (name or "").strip().casefold()).strip("_")
return f"update_version_{safe}" if safe else "update_version_unknown"
def _collect_plugin_metadata(plugin: BasisPlugin, titles: list[str]) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]]:
getter = getattr(plugin, "metadata_for", None)
if not callable(getter):
return {}
collected: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]] = {}
for title in titles:
try:
labels, art, cast = getter(title)
except Exception:
continue
if isinstance(labels, dict) or isinstance(art, dict) or cast:
label_map = {str(k): str(v) for k, v in dict(labels or {}).items() if v}
art_map = {str(k): str(v) for k, v in dict(art or {}).items() if v}
collected[title] = (label_map, art_map, cast if isinstance(cast, list) else None)
return collected
def _needs_tmdb(labels: dict[str, str], art: dict[str, str], *, want_plot: bool, want_art: bool) -> bool:
if want_plot and not labels.get("plot"):
return True
if want_art and not (art.get("thumb") or art.get("poster") or art.get("fanart") or art.get("landscape")):
return True
return False
def _merge_metadata(
title: str,
tmdb_labels: dict[str, str] | None,
tmdb_art: dict[str, str] | None,
tmdb_cast: list[TmdbCastMember] | None,
plugin_meta: tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None] | None,
) -> tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]:
labels = dict(tmdb_labels or {})
art = dict(tmdb_art or {})
cast = tmdb_cast
if plugin_meta is not None:
meta_labels, meta_art, meta_cast = plugin_meta
labels.update({k: str(v) for k, v in dict(meta_labels or {}).items() if v})
art.update({k: str(v) for k, v in dict(meta_art or {}).items() if v})
if meta_cast is not None:
cast = meta_cast
if "title" not in labels:
labels["title"] = title
return labels, art, cast
def _sync_update_version_settings() -> None:
addon = _get_addon()
addon_version = "0.0.0"
if addon is not None:
try:
addon_version = str(addon.getAddonInfo("version") or "0.0.0")
except Exception:
addon_version = "0.0.0"
_set_setting_string("update_version_addon", addon_version)
versions = {
"update_version_serienstream": "-",
"update_version_aniworld": "-",
"update_version_einschalten": "-",
"update_version_topstreamfilm": "-",
"update_version_filmpalast": "-",
"update_version_doku_streams": "-",
}
for plugin in _discover_plugins().values():
key = _settings_key_for_plugin(str(plugin.name))
if key in versions:
versions[key] = _plugin_version(plugin)
for key, value in versions.items():
_set_setting_string(key, value)
def _show_root_menu() -> None:
handle = _get_handle()
_log("Root-Menue wird angezeigt.")
@@ -890,8 +1042,7 @@ def _show_root_menu() -> None:
plugins = _discover_plugins()
for plugin_name in sorted(plugins.keys(), key=lambda value: value.casefold()):
display = f"{plugin_name}"
_add_directory_item(handle, display, "plugin_menu", {"plugin": plugin_name}, is_folder=True)
_add_directory_item(handle, plugin_name, "plugin_menu", {"plugin": plugin_name}, is_folder=True)
_add_directory_item(handle, "Einstellungen", "settings")
xbmcplugin.endOfDirectory(handle)
@@ -919,6 +1070,12 @@ def _show_plugin_menu(plugin_name: str) -> None:
if _plugin_has_capability(plugin, "genres"):
_add_directory_item(handle, "Genres", "genres", {"plugin": plugin_name}, is_folder=True)
if _plugin_has_capability(plugin, "alpha"):
_add_directory_item(handle, "A-Z", "alpha_index", {"plugin": plugin_name}, is_folder=True)
if _plugin_has_capability(plugin, "series_catalog"):
_add_directory_item(handle, "Serien", "series_catalog", {"plugin": plugin_name, "page": "1"}, is_folder=True)
if _plugin_has_capability(plugin, "popular_series"):
_add_directory_item(handle, "Meist gesehen", "popular", {"plugin": plugin_name, "page": "1"}, is_folder=True)
@@ -967,10 +1124,24 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
results.sort(key=lambda value: value.casefold())
plugin_meta = _collect_plugin_metadata(plugin, results)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results and not canceled:
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles and not canceled:
canceled = progress(35, f"{plugin_name} (1/1) Metadaten…")
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results))
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles))
total_results = max(1, len(results))
for index, title in enumerate(results, start=1):
@@ -979,8 +1150,9 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
if index == 1 or index == total_results or (index % 10 == 0):
pct = 35 + int((index / float(total_results)) * 60)
canceled = progress(pct, f"{plugin_name} (1/1) aufbereiten {index}/{total_results}")
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
@@ -1058,11 +1230,16 @@ def _discover_plugins() -> dict[str, BasisPlugin]:
except Exception as exc:
xbmc.log(f"Plugin-Datei {file_path.name} konnte nicht geladen werden: {exc}", xbmc.LOGWARNING)
continue
plugin_classes = [
obj
for obj in module.__dict__.values()
if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin
]
preferred = getattr(module, "Plugin", None)
if inspect.isclass(preferred) and issubclass(preferred, BasisPlugin) and preferred is not BasisPlugin:
plugin_classes = [preferred]
else:
plugin_classes = [
obj
for obj in module.__dict__.values()
if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin
]
plugin_classes.sort(key=lambda cls: cls.__name__.casefold())
for cls in plugin_classes:
try:
instance = cls()
@@ -1073,7 +1250,21 @@ def _discover_plugins() -> dict[str, BasisPlugin]:
reason = getattr(instance, "unavailable_reason", "Nicht verfuegbar.")
xbmc.log(f"Plugin {cls.__name__} deaktiviert: {reason}", xbmc.LOGWARNING)
continue
plugins[instance.name] = instance
plugin_name = str(getattr(instance, "name", "") or "").strip()
if not plugin_name:
xbmc.log(
f"Plugin {cls.__name__} wurde ohne Name registriert und wird uebersprungen.",
xbmc.LOGWARNING,
)
continue
if plugin_name in plugins:
xbmc.log(
f"Plugin-Name doppelt ({plugin_name}), {cls.__name__} wird uebersprungen.",
xbmc.LOGWARNING,
)
continue
plugins[plugin_name] = instance
plugins = dict(sorted(plugins.items(), key=lambda item: item[0].casefold()))
_PLUGIN_CACHE = plugins
return plugins
@@ -1143,15 +1334,29 @@ def _show_search_results(query: str) -> None:
continue
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
_log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG)
plugin_meta = _collect_plugin_metadata(plugin, results)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results:
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
canceled = progress(
range_start + int((range_end - range_start) * 0.35),
f"{plugin_name} ({plugin_index}/{total_plugins}) Metadaten…",
)
if canceled:
break
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results))
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles))
total_results = max(1, len(results))
for title_index, title in enumerate(results, start=1):
if title_index == 1 or title_index == total_results or (title_index % 10 == 0):
@@ -1161,8 +1366,9 @@ def _show_search_results(query: str) -> None:
)
if canceled:
break
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
@@ -1514,6 +1720,176 @@ def _show_genres(plugin_name: str) -> None:
xbmcplugin.endOfDirectory(handle)
def _show_categories(plugin_name: str) -> None:
handle = _get_handle()
_log(f"Kategorien laden: {plugin_name}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Kategorien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
getter = getattr(plugin, "categories", None)
if not callable(getter):
xbmcgui.Dialog().notification("Kategorien", "Kategorien nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
try:
categories = list(getter() or [])
except Exception as exc:
_log(f"Kategorien konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Kategorien", "Kategorien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
for category in categories:
category = str(category).strip()
if not category:
continue
_add_directory_item(
handle,
category,
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": "1"},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Kategorien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "titles_for_genre_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("Kategorien", "Paging nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "genre_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(category) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"{category} ({page}/{total_pages})")
else:
xbmcplugin.setPluginCategory(handle, f"{category} ({page})")
_set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(category, page) or [])
except Exception as exc:
_log(f"Kategorie-Seite konnte nicht geladen werden ({plugin_name}/{category} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Kategorien", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
if show_tmdb:
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, {}, {}, None, meta)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info(info_labels, playstate),
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(category, page))
except Exception:
show_next = False
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
@@ -1563,6 +1939,157 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = show_tmdb and _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(genre, page))
except Exception:
show_next = False
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"genre_titles_page",
{"plugin": plugin_name, "genre": genre, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_alpha_index(plugin_name: str) -> None:
handle = _get_handle()
_log(f"A-Z laden: {plugin_name}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("A-Z", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
getter = getattr(plugin, "alpha_index", None)
if not callable(getter):
xbmcgui.Dialog().notification("A-Z", "A-Z nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
try:
letters = list(getter() or [])
except Exception as exc:
_log(f"A-Z konnte nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("A-Z", "A-Z konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
for letter in letters:
letter = str(letter).strip()
if not letter:
continue
_add_directory_item(
handle,
letter,
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": "1"},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_alpha_titles_page(plugin_name: str, letter: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("A-Z", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "titles_for_alpha_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("A-Z", "Paging nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "alpha_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(letter) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"{letter} ({page}/{total_pages})")
else:
xbmcplugin.setPluginCategory(handle, f"{letter} ({page})")
_set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(letter, page) or [])
except Exception as exc:
_log(f"A-Z Seite konnte nicht geladen werden ({plugin_name}/{letter} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("A-Z", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
@@ -1611,11 +2138,113 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
show_next = False
if total_pages is not None:
show_next = page < total_pages
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_series_catalog(plugin_name: str, page: int = 1) -> None:
handle = _get_handle()
plugin_name = (plugin_name or "").strip()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Serien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "series_catalog_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("Serien", "Serien nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "series_catalog_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(page) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"Serien ({page}/{total_pages})")
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
xbmcplugin.setPluginCategory(handle, f"Serien ({page})")
_set_content(handle, "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"series_catalog",
{"plugin": plugin_name, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(page) or [])
except Exception as exc:
_log(f"Serien konnten nicht geladen werden ({plugin_name} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Serien", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(titles)
for title in titles:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "series_catalog_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(genre, page))
show_next = bool(has_more_getter(page))
except Exception:
show_next = False
@@ -1623,8 +2252,8 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
_add_directory_item(
handle,
"Nächste Seite",
"genre_titles_page",
{"plugin": plugin_name, "genre": genre, "page": str(page + 1)},
"series_catalog",
{"plugin": plugin_name, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
@@ -1802,40 +2431,45 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
plugin_meta = _collect_plugin_metadata(plugin, page_items)
show_tmdb = show_tmdb and _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
if total_pages > 1 and page < total_pages:
_add_directory_item(
@@ -2245,10 +2879,34 @@ def _open_settings() -> None:
"""Oeffnet das Kodi-Addon-Settings-Dialog."""
if xbmcaddon is None: # pragma: no cover - outside Kodi
raise RuntimeError("xbmcaddon ist nicht verfuegbar (KodiStub).")
_sync_update_version_settings()
addon = xbmcaddon.Addon()
addon.openSettings()
def _run_update_check() -> None:
"""Stoesst Kodi-Repo- und Addon-Updates an und informiert den Benutzer."""
if xbmc is None: # pragma: no cover - outside Kodi
return
try:
info_url = _normalize_update_info_url(_get_setting_string("update_repo_url"))
_set_setting_string("update_repo_url", info_url)
_sync_update_version_settings()
_update_repository_source(info_url)
builtin = getattr(xbmc, "executebuiltin", None)
if callable(builtin):
builtin("UpdateAddonRepos")
builtin("UpdateLocalAddons")
builtin("ActivateWindow(addonbrowser,addons://updates/)")
xbmcgui.Dialog().notification("ViewIT Update", "Update-Pruefung gestartet.", xbmcgui.NOTIFICATION_INFO, 4000)
except Exception as exc:
_log(f"Update-Pruefung fehlgeschlagen: {exc}", xbmc.LOGWARNING)
try:
xbmcgui.Dialog().notification("ViewIT Update", "Update-Pruefung fehlgeschlagen.", xbmcgui.NOTIFICATION_ERROR, 4000)
except Exception:
pass
def _extract_first_int(value: str) -> int | None:
match = re.search(r"(\d+)", value or "")
if not match:
@@ -2564,6 +3222,8 @@ def run() -> None:
_show_genre_sources()
elif action == "genres":
_show_genres(params.get("plugin", ""))
elif action == "categories":
_show_categories(params.get("plugin", ""))
elif action == "new_titles":
_show_new_titles(
params.get("plugin", ""),
@@ -2585,6 +3245,25 @@ def run() -> None:
params.get("genre", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "category_titles_page":
_show_category_titles_page(
params.get("plugin", ""),
params.get("category", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "alpha_index":
_show_alpha_index(params.get("plugin", ""))
elif action == "alpha_titles_page":
_show_alpha_titles_page(
params.get("plugin", ""),
params.get("letter", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "series_catalog":
_show_series_catalog(
params.get("plugin", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "genre_series_group":
_show_genre_series_group(
params.get("plugin", ""),
@@ -2599,6 +3278,8 @@ def run() -> None:
)
elif action == "settings":
_open_settings()
elif action == "check_updates":
_run_update_check()
elif action == "seasons":
_show_seasons(params.get("plugin", ""), params.get("title", ""), params.get("series_url", ""))
elif action == "episodes":

View File

@@ -4,13 +4,15 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Tuple
class BasisPlugin(ABC):
"""Abstrakte Basisklasse fuer alle Integrationen."""
name: str
version: str = "0.0.0"
prefer_source_metadata: bool = False
@abstractmethod
async def search_titles(self, query: str) -> List[str]:
@@ -28,6 +30,10 @@ class BasisPlugin(ABC):
"""Optional: Liefert den Stream-Link fuer eine konkrete Folge."""
return None
def metadata_for(self, title: str) -> Tuple[Dict[str, str], Dict[str, str], Optional[List[Any]]]:
"""Optional: Liefert Info-Labels, Art und Cast fuer einen Titel."""
return {}, {}, None
def resolve_stream_link(self, link: str) -> Optional[str]:
"""Optional: Folgt einem Stream-Link und liefert die finale URL."""
return None

View File

@@ -691,6 +691,7 @@ def search_animes(query: str) -> List[SeriesResult]:
class AniworldPlugin(BasisPlugin):
name = "Aniworld"
version = "1.0.0"
def __init__(self) -> None:
self._anime_results: Dict[str, SeriesResult] = {}

View File

@@ -0,0 +1,476 @@
"""Doku-Streams (doku-streams.com) Integration."""
from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "doku_streams_base_url"
DEFAULT_BASE_URL = "https://doku-streams.com"
MOST_VIEWED_PATH = "/meistgesehene/"
DEFAULT_TIMEOUT = 20
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_dokustreams"
SETTING_DUMP_HTML = "dump_html_dokustreams"
SETTING_SHOW_URL_INFO = "show_url_info_dokustreams"
SETTING_LOG_ERRORS = "log_errors_dokustreams"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass(frozen=True)
class SearchHit:
title: str
url: str
plot: str = ""
poster: str = ""
def _extract_last_page(soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("nav.navigation a[href], nav.pagination a[href], a.page-numbers[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"/page/(\\d+)/", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
for value in re.findall(r"(\\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def _extract_summary_and_poster(article: BeautifulSoupT) -> tuple[str, str]:
summary = ""
if article:
summary_box = article.select_one("div.entry-summary")
if summary_box is not None:
for p in summary_box.find_all("p"):
text = (p.get_text(" ", strip=True) or "").strip()
if text:
summary = text
break
poster = ""
if article:
img = article.select_one("div.entry-thumb img")
if img is not None:
poster = (img.get("data-src") or "").strip() or (img.get("src") or "").strip()
if "lazy_placeholder" in poster and img.get("data-src"):
poster = (img.get("data-src") or "").strip()
poster = _absolute_url(poster)
return summary, poster
def _parse_listing_hits(soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
for article in soup.select("article[id^='post-']"):
anchor = article.select_one("h2.entry-title a[href]")
if anchor is None:
continue
href = (anchor.get("href") or "").strip()
title = (anchor.get_text(" ", strip=True) or "").strip()
if not href or not title:
continue
if query and not _matches_query(query, title=title):
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
summary, poster = _extract_summary_and_poster(article)
hits.append(SearchHit(title=title, url=url, plot=summary, poster=poster))
return hits
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _absolute_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if url.startswith("http://") or url.startswith("https://"):
return url
if url.startswith("//"):
return f"https:{url}"
if url.startswith("/"):
return f"{_get_base_url()}{url}"
return f"{_get_base_url()}/{url.lstrip('/')}"
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="dokustreams_urls.log",
url=url,
kind=kind,
)
def _log_visit(url: str) -> None:
_log_url_event(url, kind="VISIT")
notify_url(
ADDON_ID,
heading="Doku-Streams",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="dokustreams_response",
)
def _log_error_message(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="dokustreams_errors.log",
message=message,
)
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
_log_visit(url)
sess = session or get_requests_session("dokustreams", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error_message(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
return BeautifulSoup(response.text, "html.parser")
class DokuStreamsPlugin(BasisPlugin):
name = "Doku-Streams"
version = "1.0.0"
prefer_source_metadata = True
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._category_to_url: Dict[str, str] = {}
self._category_page_count_cache: Dict[str, int] = {}
self._popular_cache: Optional[List[SearchHit]] = None
self._title_meta: Dict[str, tuple[str, str]] = {}
self._requests_available = REQUESTS_AVAILABLE
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"DokuStreamsPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
self._title_to_url = {hit.title: hit.url for hit in hits if hit.title and hit.url}
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def _search_hits(self, query: str) -> List[SearchHit]:
query = (query or "").strip()
if not query or not self._requests_available:
return []
search_url = _absolute_url(f"/?s={quote(query)}")
session = get_requests_session("dokustreams", headers=HEADERS)
try:
soup = _get_soup(search_url, session=session)
except Exception:
return []
return _parse_listing_hits(soup, query=query)
def capabilities(self) -> set[str]:
return {"genres", "popular_series"}
def _categories_url(self) -> str:
return _absolute_url("/kategorien/")
def _parse_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
def clean_name(value: str) -> str:
value = (value or "").strip()
return re.sub(r"\\s*\\(\\d+\\)\\s*$", "", value).strip()
def walk(ul, parents: List[str]) -> None:
for li in ul.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = clean_name(anchor.get_text(" ", strip=True) or "")
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
child_ul = li.find("ul", class_="nested-category-list")
if child_ul is not None:
walk(child_ul, parents + [name])
else:
if parents:
label = " \u2192 ".join(parents + [name])
categories[label] = _absolute_url(href)
walk(root, [])
return categories
def _parse_top_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
for li in root.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
categories[name] = _absolute_url(href)
return categories
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._category_to_url:
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_categories(soup)
if parsed:
self._category_to_url = dict(parsed)
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
def categories(self) -> List[str]:
if not self._requests_available:
return []
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_top_categories(soup)
if parsed:
for key, value in parsed.items():
self._category_to_url.setdefault(key, value)
return list(parsed.keys())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._category_page_count_cache:
return max(1, int(self._category_page_count_cache.get(genre, 1)))
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return 1
pages = _extract_last_page(soup)
self._category_page_count_cache[genre] = max(1, pages)
return self._category_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else f"{base_url.rstrip('/')}/page/{page}/"
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
return titles
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _most_viewed_url(self) -> str:
return _absolute_url(MOST_VIEWED_PATH)
def popular_series(self) -> List[str]:
if not self._requests_available:
return []
if self._popular_cache is not None:
titles = [hit.title for hit in self._popular_cache if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
try:
soup = _get_soup(self._most_viewed_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
self._popular_cache = list(hits)
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
plot, poster = self._title_meta.get(title, ("", ""))
info: dict[str, str] = {"title": title}
if plot:
info["plot"] = plot
art: dict[str, str] = {}
if poster:
art = {"thumb": poster, "poster": poster}
return info, art, None
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return ["Stream"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return [title]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
if not title:
return None
url = self._title_to_url.get(title)
if not url:
return None
if not self._requests_available:
return None
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return None
iframe = soup.select_one("div.fluid-width-video-wrapper iframe[src]")
if iframe is None:
iframe = soup.select_one("iframe[src*='youtube'], iframe[src*='vimeo'], iframe[src]")
if iframe is None:
return None
src = (iframe.get("src") or "").strip()
if not src:
return None
return _absolute_url(src)
# Alias für die automatische Plugin-Erkennung.
Plugin = DokuStreamsPlugin

View File

@@ -43,7 +43,7 @@ SETTING_DUMP_HTML = "dump_html_einschalten"
SETTING_SHOW_URL_INFO = "show_url_info_einschalten"
SETTING_LOG_ERRORS = "log_errors_einschalten"
DEFAULT_BASE_URL = ""
DEFAULT_BASE_URL = "https://einschalten.in"
DEFAULT_INDEX_PATH = "/"
DEFAULT_NEW_TITLES_PATH = "/movies/new"
DEFAULT_SEARCH_PATH = "/search"
@@ -507,6 +507,7 @@ class EinschaltenPlugin(BasisPlugin):
"""Metadata-Plugin für eine autorisierte Quelle."""
name = "Einschalten"
version = "1.0.0"
def __init__(self) -> None:
self.is_available = REQUESTS_AVAILABLE
@@ -1078,3 +1079,7 @@ class EinschaltenPlugin(BasisPlugin):
return []
# Backwards compatible: first page only. UI uses paging via `new_titles_page`.
return self.new_titles_page(1)
# Alias für die automatische Plugin-Erkennung.
Plugin = EinschaltenPlugin

View File

@@ -10,6 +10,7 @@ from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote, urlencode
from urllib.parse import urljoin
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency
@@ -42,6 +43,7 @@ DEFAULT_BASE_URL = "https://filmpalast.to"
DEFAULT_TIMEOUT = 20
DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"]
SERIES_HINT_PREFIX = "series://filmpalast/"
SERIES_VIEW_PATH = "/serien/view"
SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE)
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
@@ -218,11 +220,17 @@ def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> Beautif
class FilmpalastPlugin(BasisPlugin):
name = "Filmpalast"
version = "1.0.0"
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {}
self._hoster_cache: Dict[str, Dict[str, str]] = {}
self._genre_to_url: Dict[str, str] = {}
self._genre_page_count_cache: Dict[str, int] = {}
self._alpha_to_url: Dict[str, str] = {}
self._alpha_page_count_cache: Dict[str, int] = {}
self._series_page_count_cache: Dict[int, int] = {}
self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
@@ -391,8 +399,41 @@ class FilmpalastPlugin(BasisPlugin):
return hits
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
def _parse_listing_hits(self, soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
if not anchors:
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if not _is_probably_content_url(url):
continue
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
if not title:
continue
if title.casefold() in {"details/play", "play", "details"}:
continue
if query and not _matches_query(query, title=title):
continue
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
hits.append(SearchHit(title=title, url=url))
return hits
def _apply_hits_to_title_index(self, hits: List[SearchHit]) -> List[str]:
self._title_to_url = {}
self._series_entries = {}
self._hoster_cache.clear()
@@ -425,6 +466,208 @@ class FilmpalastPlugin(BasisPlugin):
titles.sort(key=lambda value: value.casefold())
return titles
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
return self._apply_hits_to_title_index(hits)
def _parse_genres(self, soup: BeautifulSoupT) -> Dict[str, str]:
genres: Dict[str, str] = {}
if not soup:
return genres
for anchor in soup.select("section#genre a[href], #genre a[href], aside #genre a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/genre/" not in href:
continue
genres[name] = _absolute_url(href)
return genres
def _extract_last_page(self, soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("#paging a[href], .paging a[href], a.pageing[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"(\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def capabilities(self) -> set[str]:
return {"genres", "alpha", "series_catalog"}
def _parse_alpha_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
alpha: Dict[str, str] = {}
if not soup:
return alpha
for anchor in soup.select("section#movietitle a[href], #movietitle a[href], aside #movietitle a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/alpha/" not in href:
continue
if name in alpha:
continue
alpha[name] = _absolute_url(href)
return alpha
def alpha_index(self) -> List[str]:
if not self._requests_available:
return []
if self._alpha_to_url:
return list(self._alpha_to_url.keys())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_alpha_links(soup)
if parsed:
self._alpha_to_url = dict(parsed)
return list(self._alpha_to_url.keys())
def alpha_page_count(self, letter: str) -> int:
letter = (letter or "").strip()
if not letter:
return 1
if letter in self._alpha_page_count_cache:
return max(1, int(self._alpha_page_count_cache.get(letter, 1)))
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._alpha_page_count_cache[letter] = max(1, pages)
return self._alpha_page_count_cache[letter]
def titles_for_alpha_page(self, letter: str, page: int) -> List[str]:
letter = (letter or "").strip()
if not letter or not self._requests_available:
return []
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_alpha(self, letter: str) -> List[str]:
titles = self.titles_for_alpha_page(letter, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _series_view_url(self) -> str:
return _absolute_url(SERIES_VIEW_PATH)
def series_catalog_page_count(self, page: int = 1) -> int:
if not self._requests_available:
return 1
cache_key = int(page or 1)
if cache_key in self._series_page_count_cache:
return max(1, int(self._series_page_count_cache.get(cache_key, 1)))
base_url = self._series_view_url()
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._series_page_count_cache[cache_key] = max(1, pages)
return self._series_page_count_cache[cache_key]
def series_catalog_page(self, page: int) -> List[str]:
if not self._requests_available:
return []
base_url = self._series_view_url()
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def series_catalog_has_more(self, page: int) -> bool:
total = self.series_catalog_page_count(page)
return page < total
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._genre_to_url:
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_genres(soup)
if parsed:
self._genre_to_url = dict(parsed)
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._genre_page_count_cache:
return max(1, int(self._genre_page_count_cache.get(genre, 1)))
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._genre_page_count_cache[genre] = max(1, pages)
return self._genre_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _ensure_title_url(self, title: str) -> str:
title = (title or "").strip()
if not title:
@@ -643,26 +886,43 @@ class FilmpalastPlugin(BasisPlugin):
def resolve_stream_link(self, link: str) -> Optional[str]:
if not link:
return None
resolved = link
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
# 1) Immer zuerst den ursprünglichen Hoster-Link an ResolveURL geben.
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(link)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
redirected = link
if self._requests_available:
try:
session = get_requests_session("filmpalast", headers=HEADERS)
response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
response.raise_for_status()
resolved = (response.url or link).strip() or link
redirected = (response.url or link).strip() or link
except Exception:
resolved = link
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(resolved)
redirected = link
# 2) Danach optional die Redirect-URL nochmals auflösen.
if callable(resolve_with_resolveurl) and redirected and redirected != link:
resolved_by_resolveurl = resolve_with_resolveurl(redirected)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
if resolved:
_log_url_event(resolved, kind="FINAL")
return resolved
# 3) Fallback bleibt wie bisher: direkte URL zurückgeben.
if redirected:
_log_url_event(redirected, kind="FINAL")
return redirected
return None
# Alias für die automatische Plugin-Erkennung.
Plugin = FilmpalastPlugin

View File

@@ -784,6 +784,7 @@ class SerienstreamPlugin(BasisPlugin):
"""Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt."""
name = "Serienstream"
version = "1.0.0"
POPULAR_GENRE_LABEL = "⭐ Beliebte Serien"
def __init__(self) -> None:

View File

@@ -57,7 +57,7 @@ else: # pragma: no cover
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "topstream_base_url"
DEFAULT_BASE_URL = "https://www.meineseite"
DEFAULT_BASE_URL = "https://topstreamfilm.live"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
@@ -123,6 +123,7 @@ class TopstreamfilmPlugin(BasisPlugin):
"""Integration fuer eine HTML-basierte Suchseite."""
name = "Topstreamfilm"
version = "1.0.0"
def __init__(self) -> None:
self._session: RequestsSession | None = None

View File

@@ -45,6 +45,9 @@
<category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" />
</category>
<category label="Doku-Streams">
<setting id="doku_streams_base_url" type="text" label="Domain (BASE_URL)" default="https://doku-streams.com" />
</category>
<category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />
<setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" />
@@ -61,4 +64,16 @@
<setting id="tmdb_log_requests" type="bool" label="TMDB API Requests loggen" default="false" />
<setting id="tmdb_log_responses" type="bool" label="TMDB API Antworten loggen" default="false" />
</category>
<category label="Update">
<setting id="update_repo_url" type="text" label="Update-URL (addons.xml)" default="http://127.0.0.1:8080/repo/addons.xml" />
<setting id="run_update_check" type="action" label="Jetzt auf Updates pruefen" action="RunPlugin(plugin://plugin.video.viewit/?action=check_updates)" option="close" />
<setting id="update_info" type="text" label="Kodi-Repository-Updates werden ueber den Kodi-Update-Mechanismus verarbeitet." default="" enable="false" />
<setting id="update_version_addon" type="text" label="ViewIT Addon Version" default="-" enable="false" />
<setting id="update_version_serienstream" type="text" label="Serienstream Plugin Version" default="-" enable="false" />
<setting id="update_version_aniworld" type="text" label="Aniworld Plugin Version" default="-" enable="false" />
<setting id="update_version_einschalten" type="text" label="Einschalten Plugin Version" default="-" enable="false" />
<setting id="update_version_topstreamfilm" type="text" label="Topstreamfilm Plugin Version" default="-" enable="false" />
<setting id="update_version_filmpalast" type="text" label="Filmpalast Plugin Version" default="-" enable="false" />
<setting id="update_version_doku_streams" type="text" label="Doku-Streams Plugin Version" default="-" enable="false" />
</category>
</settings>

View File

@@ -10,7 +10,7 @@ Dieses Dokument beschreibt den Einstiegspunkt des Addons und die zentrale Steuer
- startet die Wiedergabe und verwaltet Playstate/Resume.
## Ablauf (high level)
1. **PluginDiscovery**: Lädt alle `addon/plugins/*.py` (ohne `_`Prefix) und instanziiert Klassen, die von `BasisPlugin` erben.
1. **PluginDiscovery**: Lädt alle `addon/plugins/*.py` (ohne `_`Prefix). Bevorzugt `Plugin = <Klasse>`, sonst werden `BasisPlugin`Subklassen deterministisch instanziiert.
2. **Navigation**: Baut KodiListen (Serien/Staffeln/Episoden) auf Basis der PluginAntworten.
3. **Playback**: Holt StreamLinks aus dem Plugin und startet die Wiedergabe.
4. **Playstate**: Speichert ResumeDaten lokal (`playstate.json`) und setzt `playcount`/ResumeInfos.
@@ -35,6 +35,7 @@ Die genaue Aktion wird aus den QueryParametern gelesen und an das entsprechen
- **PluginLoader**: findet & instanziiert Plugins.
- **UIHelper**: setzt ContentType, baut Verzeichnisseinträge.
- **PlaystateHelper**: `_load_playstate`, `_save_playstate`, `_apply_playstate_to_info`.
- **MetadataMerge**: PluginMetadaten können TMDB übersteuern, TMDB dient als Fallback.
## Fehlerbehandlung
- PluginImportfehler werden isoliert behandelt, damit das Addon nicht komplett ausfällt.

View File

@@ -6,6 +6,7 @@ Diese Doku beschreibt, wie Plugins im ViewITAddon aufgebaut sind und wie neue
- Jedes Plugin ist eine einzelne Datei unter `addon/plugins/`.
- Dateinamen **ohne** `_`Prefix werden automatisch geladen.
- Jede Datei enthält eine Klasse, die von `BasisPlugin` erbt.
- Optional: `Plugin = <Klasse>` als expliziter Einstiegspunkt (bevorzugt vom Loader).
## PflichtMethoden (BasisPlugin)
Jedes Plugin muss diese Methoden implementieren:
@@ -22,6 +23,7 @@ Wesentliche Rückgaben an die Hauptlogik:
- `episodes_for(...)` → Liste von Episoden-Labels
- `stream_link_for(...)` → Hoster-/Player-Link (nicht zwingend finale Media-URL)
- `resolve_stream_link(...)` → finale/spielbare URL nach Redirect/Resolver
- `metadata_for(...)` → Info-Labels/Art (Plot/Poster) aus der Quelle
- Optional `available_hosters_for(...)` → auswählbare Hoster-Namen im Dialog
- Optional `series_url_for_title(...)` → stabile Detail-URL pro Titel für Folgeaufrufe
- Optional `remember_series_url(...)` → Übernahme einer bereits bekannten Detail-URL
@@ -35,6 +37,9 @@ Standard für Film-Provider (ohne echte Staffeln):
- `popular_series``popular_series()`
- `genres``genres()` + `titles_for_genre(genre)`
- `latest_episodes``latest_episodes(page=1)`
- `new_titles``new_titles_page(page=1)`
- `alpha``alpha_index()` + `titles_for_alpha_page(letter, page)`
- `series_catalog``series_catalog_page(page=1)`
## Empfohlene Struktur
- Konstanten für URLs/Endpoints (BASE_URL, Pfade, Templates)
@@ -60,6 +65,7 @@ Standard: `*_base_url` (Domain / BASE_URL)
- `einschalten_base_url`
- `topstream_base_url`
- `filmpalast_base_url`
- `doku_streams_base_url`
## Playback
- `stream_link_for(...)` implementieren (liefert bevorzugten Hoster-Link).
@@ -75,7 +81,8 @@ Standard: `*_base_url` (Domain / BASE_URL)
2. **Navigation**: `series_url_for_title`/`remember_series_url` unterstützen, damit URLs zwischen Aufrufen stabil bleiben.
3. **Auswahl Hoster**: Hoster-Namen aus der Detailseite extrahieren und anbieten.
4. **Playback**: Hoster-Link liefern, danach konsistent über `resolve_stream_link` finalisieren.
5. **Fallbacks**: bei Layout-Unterschieden defensiv parsen und Logging aktivierbar halten.
5. **Metadaten**: `metadata_for` nutzen, Plot/Poster aus der Quelle zurückgeben.
6. **Fallbacks**: bei Layout-Unterschieden defensiv parsen und Logging aktivierbar halten.
## Debugging
Global gesteuert über Settings:
@@ -94,6 +101,8 @@ Plugins sollten die Helper aus `addon/plugin_helpers.py` nutzen:
## Build & Test
- ZIP bauen: `./scripts/build_kodi_zip.sh`
- AddonOrdner: `./scripts/build_install_addon.sh`
- PluginManifest aktualisieren: `python3 scripts/generate_plugin_manifest.py`
- Live-Snapshot-Checks: `python3 qa/run_plugin_snapshots.py` (aktualisieren mit `--update`)
## BeispielCheckliste
- [ ] `name` korrekt gesetzt

104
docs/PLUGIN_MANIFEST.json Normal file
View File

@@ -0,0 +1,104 @@
{
"schema_version": 1,
"plugins": [
{
"file": "addon/plugins/aniworld_plugin.py",
"module": "aniworld_plugin",
"name": "Aniworld",
"class": "AniworldPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"latest_episodes",
"popular_series"
],
"prefer_source_metadata": false,
"base_url_setting": "aniworld_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/dokustreams_plugin.py",
"module": "dokustreams_plugin",
"name": "Doku-Streams",
"class": "DokuStreamsPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"popular_series"
],
"prefer_source_metadata": true,
"base_url_setting": "doku_streams_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/einschalten_plugin.py",
"module": "einschalten_plugin",
"name": "Einschalten",
"class": "EinschaltenPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"new_titles"
],
"prefer_source_metadata": false,
"base_url_setting": "einschalten_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/filmpalast_plugin.py",
"module": "filmpalast_plugin",
"name": "Filmpalast",
"class": "FilmpalastPlugin",
"version": "1.0.0",
"capabilities": [
"alpha",
"genres",
"series_catalog"
],
"prefer_source_metadata": false,
"base_url_setting": "filmpalast_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/serienstream_plugin.py",
"module": "serienstream_plugin",
"name": "Serienstream",
"class": "SerienstreamPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"latest_episodes",
"popular_series"
],
"prefer_source_metadata": false,
"base_url_setting": "serienstream_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/topstreamfilm_plugin.py",
"module": "topstreamfilm_plugin",
"name": "Topstreamfilm",
"class": "TopstreamfilmPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"popular_series"
],
"prefer_source_metadata": false,
"base_url_setting": "topstream_base_url",
"available": true,
"unavailable_reason": null,
"error": null
}
]
}

View File

@@ -9,6 +9,7 @@ ViewIt lädt Provider-Integrationen dynamisch aus `addon/plugins/*.py`. Jede Dat
Weitere Details:
- `docs/DEFAULT_ROUTER.md` (Hauptlogik in `addon/default.py`)
- `docs/PLUGIN_DEVELOPMENT.md` (Entwicklerdoku für Plugins)
- `docs/PLUGIN_MANIFEST.json` (zentraler Überblick über Plugins, Versionen, Capabilities)
### Aktuelle Plugins
@@ -17,6 +18,7 @@ Weitere Details:
- `einschalten_plugin.py` Einschalten
- `aniworld_plugin.py` Aniworld
- `filmpalast_plugin.py` Filmpalast
- `dokustreams_plugin.py` Doku-Streams
- `_template_plugin.py` Vorlage für neue Plugins
### Plugin-Discovery (Ladeprozess)
@@ -26,11 +28,16 @@ Der Loader in `addon/default.py`:
1. Sucht alle `*.py` in `addon/plugins/`
2. Überspringt Dateien, die mit `_` beginnen
3. Lädt Module dynamisch
4. Instanziert Klassen, die von `BasisPlugin` erben
5. Ignoriert Plugins mit `is_available = False`
4. Nutzt `Plugin = <Klasse>` als bevorzugten Einstiegspunkt (falls vorhanden)
5. Fallback: instanziert Klassen, die von `BasisPlugin` erben (deterministisch sortiert)
6. Ignoriert Plugins mit `is_available = False`
Damit bleiben fehlerhafte Plugins isoliert und blockieren nicht das gesamte Add-on.
### Plugin-Manifest (Audit & Repro)
`docs/PLUGIN_MANIFEST.json` listet alle Plugins mit Version, Capabilities und Basis-Settings.
Erzeugung: `python3 scripts/generate_plugin_manifest.py`
### BasisPlugin verpflichtende Methoden
Definiert in `addon/plugin_interface.py`:
@@ -38,19 +45,29 @@ Definiert in `addon/plugin_interface.py`:
- `async search_titles(query: str) -> list[str]`
- `seasons_for(title: str) -> list[str]`
- `episodes_for(title: str, season: str) -> list[str]`
- optional `metadata_for(title: str) -> (info_labels, art, cast)`
### Optionale Features (Capabilities)
Plugins können zusätzliche Features anbieten:
- `capabilities() -> set[str]`
- `popular_series`: liefert beliebte Serien
- `genres`: Genre-Liste verfügbar
- `latest_episodes`: neue Episoden verfügbar
- `popular_series`: liefert beliebte Serien
- `genres`: Genre-Liste verfügbar
- `latest_episodes`: neue Episoden verfügbar
- `new_titles`: neue Titel verfügbar
- `alpha`: A-Z Index verfügbar
- `series_catalog`: Serienkatalog verfügbar
- `popular_series() -> list[str]`
- `genres() -> list[str]`
- `titles_for_genre(genre: str) -> list[str]`
- `latest_episodes(page: int = 1) -> list[LatestEpisode]` (wenn angeboten)
- `new_titles_page(page: int = 1) -> list[str]` (wenn angeboten)
- `alpha_index() -> list[str]` (wenn angeboten)
- `series_catalog_page(page: int = 1) -> list[str]` (wenn angeboten)
Metadaten:
- `prefer_source_metadata = True` bedeutet: Plugin-Metadaten gehen vor TMDB, TMDB dient nur als Fallback.
ViewIt zeigt im UI nur die Features an, die ein Plugin tatsächlich liefert.
@@ -62,6 +79,7 @@ Eine Integration sollte typischerweise bieten:
- `search_titles()` mit Provider-Suche
- `seasons_for()` und `episodes_for()` mit HTML-Parsing
- `stream_link_for()` optional für direkte Playback-Links
- `metadata_for()` optional für Plot/Poster aus der Quelle
- Optional: `available_hosters_for()` oder Provider-spezifische Helfer
Als Startpunkt dient `addon/plugins/_template_plugin.py`.
@@ -79,8 +97,9 @@ Als Startpunkt dient `addon/plugins/_template_plugin.py`.
- Keine Netzwerkzugriffe im Import-Top-Level
- Netzwerkzugriffe nur in Methoden (z.B. `search_titles`)
- Fehler sauber abfangen und verständliche Fehlermeldungen liefern
- Kein globaler Zustand, der across instances überrascht
- Kein globaler Zustand, der über Instanzen hinweg überrascht
- Provider-spezifische Parser in Helper-Funktionen kapseln
- Reproduzierbare Reihenfolge: `Plugin`-Alias nutzen oder Klassenname eindeutig halten
### Debugging & Logs

73
qa/plugin_snapshots.json Normal file
View File

@@ -0,0 +1,73 @@
{
"snapshots": {
"Serienstream::search_titles::trek": [
"Star Trek: Lower Decks",
"Star Trek: Prodigy",
"Star Trek: The Animated Series",
"Inside Star Trek",
"Raumschiff Enterprise - Star Trek: The Original Series",
"Star Trek: Deep Space Nine",
"Star Trek: Discovery",
"Star Trek: Enterprise",
"Star Trek: Picard",
"Star Trek: Raumschiff Voyager",
"Star Trek: Short Treks",
"Star Trek: Starfleet Academy",
"Star Trek: Strange New Worlds",
"Star Trek: The Next Generation"
],
"Aniworld::search_titles::naruto": [
"Naruto",
"Naruto Shippuden",
"Boruto: Naruto Next Generations",
"Naruto Spin-Off: Rock Lee &amp; His Ninja Pals"
],
"Topstreamfilm::search_titles::matrix": [
"Darkdrive Verschollen in der Matrix",
"Matrix Reloaded",
"Armitage III: Poly Matrix",
"Matrix Resurrections",
"Matrix",
"Matrix Revolutions",
"Matrix Fighters"
],
"Einschalten::new_titles_page::1": [
"Miracle: Das Eishockeywunder von 1980",
"No Escape - Grizzly Night",
"Kidnapped: Der Fall Elizabeth Smart",
"The Internship",
"The Rip",
"Die Toten vom Bodensee Schicksalsrad",
"People We Meet on Vacation",
"Anaconda",
"Even If This Love Disappears Tonight",
"Die Stunde der Mutigen",
"10DANCE",
"SpongeBob Schwammkopf: Piraten Ahoi!",
"Ella McCay",
"Merv",
"Elmo and Mark Rober's Merry Giftmas",
"Als mein Vater Weihnachten rettete 2",
"Die Fraggles: Der erste Schnee",
"Gregs Tagebuch 3: Jetzt reicht's!",
"Not Without Hope",
"Five Nights at Freddy's 2"
],
"Filmpalast::search_titles::trek": [
"Star Trek",
"Star Trek - Der Film",
"Star Trek 2 - Der Zorn des Khan",
"Star Trek 9 Der Aufstand",
"Star Trek: Nemesis",
"Star Trek: Section 31",
"Star Trek: Starfleet Academy",
"Star Trek: Strange New Worlds"
],
"Doku-Streams::search_titles::japan": [
"Deutsche im Knast - Japan und die Disziplin",
"Die Meerfrauen von Japan",
"Japan - Land der Moderne und Tradition",
"Japan im Zweiten Weltkrieg - Der Fall des Kaiserreichs"
]
}
}

153
qa/run_plugin_snapshots.py Executable file
View File

@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""Run live snapshot checks for plugins.
Use --update to refresh stored snapshots.
"""
from __future__ import annotations
import argparse
import asyncio
import importlib.util
import inspect
import json
import sys
from pathlib import Path
from typing import Any
ROOT_DIR = Path(__file__).resolve().parents[1]
PLUGIN_DIR = ROOT_DIR / "addon" / "plugins"
SNAPSHOT_PATH = ROOT_DIR / "qa" / "plugin_snapshots.json"
sys.path.insert(0, str(ROOT_DIR / "addon"))
try:
from plugin_interface import BasisPlugin # type: ignore
except Exception as exc: # pragma: no cover
raise SystemExit(f"Failed to import BasisPlugin: {exc}")
CONFIG = [
{"plugin": "Serienstream", "method": "search_titles", "args": ["trek"], "max_items": 20},
{"plugin": "Aniworld", "method": "search_titles", "args": ["naruto"], "max_items": 20},
{"plugin": "Topstreamfilm", "method": "search_titles", "args": ["matrix"], "max_items": 20},
{"plugin": "Einschalten", "method": "new_titles_page", "args": [1], "max_items": 20},
{"plugin": "Filmpalast", "method": "search_titles", "args": ["trek"], "max_items": 20},
{"plugin": "Doku-Streams", "method": "search_titles", "args": ["japan"], "max_items": 20},
]
def _import_module(path: Path):
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
raise ImportError(f"Missing spec for {path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def _discover_plugins() -> dict[str, BasisPlugin]:
plugins: dict[str, BasisPlugin] = {}
for file_path in sorted(PLUGIN_DIR.glob("*.py")):
if file_path.name.startswith("_"):
continue
module = _import_module(file_path)
preferred = getattr(module, "Plugin", None)
if inspect.isclass(preferred) and issubclass(preferred, BasisPlugin) and preferred is not BasisPlugin:
classes = [preferred]
else:
classes = [
obj
for obj in module.__dict__.values()
if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin
]
classes.sort(key=lambda cls: cls.__name__.casefold())
for cls in classes:
instance = cls()
name = str(getattr(instance, "name", "") or "").strip()
if name and name not in plugins:
plugins[name] = instance
return plugins
def _normalize_titles(value: Any, max_items: int) -> list[str]:
if not value:
return []
titles = [str(item).strip() for item in list(value) if item and str(item).strip()]
seen = set()
normalized: list[str] = []
for title in titles:
key = title.casefold()
if key in seen:
continue
seen.add(key)
normalized.append(title)
if len(normalized) >= max_items:
break
return normalized
def _snapshot_key(entry: dict[str, Any]) -> str:
args = entry.get("args", [])
return f"{entry['plugin']}::{entry['method']}::{','.join(str(a) for a in args)}"
def _call_method(plugin: BasisPlugin, method_name: str, args: list[Any]):
method = getattr(plugin, method_name, None)
if not callable(method):
raise RuntimeError(f"Method missing: {method_name}")
result = method(*args)
if asyncio.iscoroutine(result):
return asyncio.run(result)
return result
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--update", action="store_true")
args = parser.parse_args()
snapshots: dict[str, Any] = {}
if SNAPSHOT_PATH.exists():
snapshots = json.loads(SNAPSHOT_PATH.read_text(encoding="utf-8"))
data = snapshots.get("snapshots", {}) if isinstance(snapshots, dict) else {}
if args.update:
data = {}
plugins = _discover_plugins()
errors = []
for entry in CONFIG:
plugin_name = entry["plugin"]
plugin = plugins.get(plugin_name)
if plugin is None:
errors.append(f"Plugin missing: {plugin_name}")
continue
key = _snapshot_key(entry)
try:
result = _call_method(plugin, entry["method"], entry.get("args", []))
normalized = _normalize_titles(result, entry.get("max_items", 20))
except Exception as exc:
errors.append(f"Snapshot error: {key} ({exc})")
if args.update:
data[key] = {"error": str(exc)}
continue
if args.update:
data[key] = normalized
else:
expected = data.get(key)
if expected != normalized:
errors.append(f"Snapshot mismatch: {key}\nExpected: {expected}\nActual: {normalized}")
if args.update:
SNAPSHOT_PATH.parent.mkdir(parents=True, exist_ok=True)
SNAPSHOT_PATH.write_text(json.dumps({"snapshots": data}, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
if errors:
for err in errors:
print(err)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="repository.viewit" name="ViewIT Repository" version="1.0.0" provider-name="ViewIT">
<extension point="xbmc.addon.repository" name="ViewIT Repository">
<dir>
<info compressed="false">http://127.0.0.1:8080/repo/addons.xml</info>
<checksum>http://127.0.0.1:8080/repo/addons.xml.md5</checksum>
<datadir zip="true">http://127.0.0.1:8080/repo/</datadir>
</dir>
</extension>
<extension point="xbmc.addon.metadata">
<summary lang="de_DE">Lokales Repository fuer ViewIT Updates</summary>
<summary lang="en_GB">Local repository for ViewIT updates</summary>
<description lang="de_DE">Stellt das ViewIT Addon ueber ein Kodi Repository bereit.</description>
<description lang="en_GB">Provides the ViewIT addon via a Kodi repository.</description>
<platform>all</platform>
</extension>
</addon>

View File

@@ -37,6 +37,6 @@ ZIP_PATH="${INSTALL_DIR}/${ZIP_NAME}"
ADDON_DIR="$("${ROOT_DIR}/scripts/build_install_addon.sh" >/dev/null; echo "${INSTALL_DIR}/${ADDON_ID}")"
rm -f "${ZIP_PATH}"
(cd "${INSTALL_DIR}" && zip -r "${ZIP_NAME}" "$(basename "${ADDON_DIR}")" >/dev/null)
python3 "${ROOT_DIR}/scripts/zip_deterministic.py" "${ZIP_PATH}" "${ADDON_DIR}" >/dev/null
echo "${ZIP_PATH}"

110
scripts/build_local_kodi_repo.sh Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
REPO_DIR="${DIST_DIR}/repo"
PLUGIN_ADDON_XML="${ROOT_DIR}/addon/addon.xml"
REPO_SRC_DIR="${ROOT_DIR}/repository.viewit"
REPO_ADDON_XML="${REPO_SRC_DIR}/addon.xml"
REPO_BASE_URL="${REPO_BASE_URL:-http://127.0.0.1:8080/repo}"
if [[ ! -f "${PLUGIN_ADDON_XML}" ]]; then
echo "Missing: ${PLUGIN_ADDON_XML}" >&2
exit 1
fi
if [[ ! -f "${REPO_ADDON_XML}" ]]; then
echo "Missing: ${REPO_ADDON_XML}" >&2
exit 1
fi
mkdir -p "${REPO_DIR}"
PLUGIN_ZIP="$("${ROOT_DIR}/scripts/build_kodi_zip.sh")"
cp -f "${PLUGIN_ZIP}" "${REPO_DIR}/"
read -r REPO_ADDON_ID REPO_ADDON_VERSION < <(python3 - "${REPO_ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "repository.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
TMP_DIR="$(mktemp -d)"
trap 'rm -rf "${TMP_DIR}"' EXIT
TMP_REPO_ADDON_DIR="${TMP_DIR}/${REPO_ADDON_ID}"
mkdir -p "${TMP_REPO_ADDON_DIR}"
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete "${REPO_SRC_DIR}/" "${TMP_REPO_ADDON_DIR}/"
else
cp -a "${REPO_SRC_DIR}/." "${TMP_REPO_ADDON_DIR}/"
fi
python3 - "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_BASE_URL}" <<'PY'
import sys
import xml.etree.ElementTree as ET
addon_xml = sys.argv[1]
base_url = sys.argv[2].rstrip("/")
tree = ET.parse(addon_xml)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
raise SystemExit("Invalid repository addon.xml: missing <dir>")
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
raise SystemExit("Invalid repository addon.xml: missing info/checksum/datadir")
info.text = f"{base_url}/addons.xml"
checksum.text = f"{base_url}/addons.xml.md5"
datadir.text = f"{base_url}/"
tree.write(addon_xml, encoding="utf-8", xml_declaration=True)
PY
REPO_ZIP_NAME="${REPO_ADDON_ID}-${REPO_ADDON_VERSION}.zip"
REPO_ZIP_PATH="${REPO_DIR}/${REPO_ZIP_NAME}"
rm -f "${REPO_ZIP_PATH}"
python3 "${ROOT_DIR}/scripts/zip_deterministic.py" "${REPO_ZIP_PATH}" "${TMP_REPO_ADDON_DIR}" >/dev/null
python3 - "${PLUGIN_ADDON_XML}" "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_DIR}/addons.xml" <<'PY'
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
plugin_xml = Path(sys.argv[1])
repo_xml = Path(sys.argv[2])
target = Path(sys.argv[3])
addons = ET.Element("addons")
for source in (plugin_xml, repo_xml):
root = ET.parse(source).getroot()
addons.append(root)
target.write_text('<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(addons, encoding="unicode"), encoding="utf-8")
PY
python3 - "${REPO_DIR}/addons.xml" "${REPO_DIR}/addons.xml.md5" <<'PY'
import hashlib
import sys
from pathlib import Path
addons_xml = Path(sys.argv[1])
md5_file = Path(sys.argv[2])
md5 = hashlib.md5(addons_xml.read_bytes()).hexdigest()
md5_file.write_text(md5, encoding="ascii")
PY
echo "Repo built:"
echo " ${REPO_DIR}/addons.xml"
echo " ${REPO_DIR}/addons.xml.md5"
echo " ${REPO_ZIP_PATH}"
echo " ${REPO_DIR}/$(basename "${PLUGIN_ZIP}")"

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""Generate a JSON manifest for addon plugins."""
from __future__ import annotations
import importlib.util
import inspect
import json
import sys
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[1]
PLUGIN_DIR = ROOT_DIR / "addon" / "plugins"
OUTPUT_PATH = ROOT_DIR / "docs" / "PLUGIN_MANIFEST.json"
sys.path.insert(0, str(ROOT_DIR / "addon"))
try:
from plugin_interface import BasisPlugin # type: ignore
except Exception as exc: # pragma: no cover
raise SystemExit(f"Failed to import BasisPlugin: {exc}")
def _import_module(path: Path):
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
raise ImportError(f"Missing spec for {path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def _collect_plugins():
plugins = []
for file_path in sorted(PLUGIN_DIR.glob("*.py")):
if file_path.name.startswith("_"):
continue
entry = {
"file": str(file_path.relative_to(ROOT_DIR)),
"module": file_path.stem,
"name": None,
"class": None,
"version": None,
"capabilities": [],
"prefer_source_metadata": False,
"base_url_setting": None,
"available": None,
"unavailable_reason": None,
"error": None,
}
try:
module = _import_module(file_path)
preferred = getattr(module, "Plugin", None)
if inspect.isclass(preferred) and issubclass(preferred, BasisPlugin) and preferred is not BasisPlugin:
classes = [preferred]
else:
classes = [
obj
for obj in module.__dict__.values()
if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin
]
classes.sort(key=lambda cls: cls.__name__.casefold())
if not classes:
entry["error"] = "No plugin classes found"
plugins.append(entry)
continue
cls = classes[0]
instance = cls()
entry["class"] = cls.__name__
entry["name"] = str(getattr(instance, "name", "") or "") or None
entry["version"] = str(getattr(instance, "version", "0.0.0") or "0.0.0")
entry["prefer_source_metadata"] = bool(getattr(instance, "prefer_source_metadata", False))
entry["available"] = bool(getattr(instance, "is_available", True))
entry["unavailable_reason"] = getattr(instance, "unavailable_reason", None)
try:
caps = instance.capabilities() # type: ignore[call-arg]
entry["capabilities"] = sorted([str(c) for c in caps]) if caps else []
except Exception:
entry["capabilities"] = []
entry["base_url_setting"] = getattr(module, "SETTING_BASE_URL", None)
except Exception as exc: # pragma: no cover
entry["error"] = str(exc)
plugins.append(entry)
plugins.sort(key=lambda item: (item.get("name") or item["module"]).casefold())
return plugins
def main() -> int:
if not PLUGIN_DIR.exists():
raise SystemExit("Plugin directory missing")
manifest = {
"schema_version": 1,
"plugins": _collect_plugins(),
}
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
print(str(OUTPUT_PATH))
return 0
if __name__ == "__main__":
raise SystemExit(main())

193
scripts/publish_gitea_release.sh Executable file
View File

@@ -0,0 +1,193 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
ADDON_XML="${ROOT_DIR}/addon/addon.xml"
DEFAULT_NOTES="Automatischer Release-Upload aus ViewIT Build."
TAG=""
ASSET_PATH=""
TITLE=""
NOTES="${DEFAULT_NOTES}"
DRY_RUN="0"
while [[ $# -gt 0 ]]; do
case "$1" in
--tag)
TAG="${2:-}"
shift 2
;;
--asset)
ASSET_PATH="${2:-}"
shift 2
;;
--title)
TITLE="${2:-}"
shift 2
;;
--notes)
NOTES="${2:-}"
shift 2
;;
--dry-run)
DRY_RUN="1"
shift
;;
*)
echo "Unbekanntes Argument: $1" >&2
exit 1
;;
esac
done
if [[ ! -f "${ADDON_XML}" ]]; then
echo "Missing: ${ADDON_XML}" >&2
exit 1
fi
read -r ADDON_ID ADDON_VERSION < <(python3 - "${ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "plugin.video.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
if [[ -z "${TAG}" ]]; then
TAG="v${ADDON_VERSION}"
fi
if [[ -z "${ASSET_PATH}" ]]; then
ASSET_PATH="${ROOT_DIR}/dist/${ADDON_ID}-${ADDON_VERSION}.zip"
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset nicht gefunden, baue ZIP: ${ASSET_PATH}"
"${ROOT_DIR}/scripts/build_kodi_zip.sh" >/dev/null
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset fehlt nach Build: ${ASSET_PATH}" >&2
exit 1
fi
if [[ -z "${TITLE}" ]]; then
TITLE="ViewIT ${TAG}"
fi
REMOTE_URL="$(git -C "${ROOT_DIR}" remote get-url origin)"
read -r BASE_URL OWNER REPO < <(python3 - "${REMOTE_URL}" <<'PY'
import re
import sys
u = sys.argv[1].strip()
m = re.match(r"^https?://([^/]+)/([^/]+)/([^/.]+)(?:\.git)?/?$", u)
if not m:
raise SystemExit("Origin-URL muss https://host/owner/repo(.git) sein.")
host, owner, repo = m.group(1), m.group(2), m.group(3)
print(f"https://{host}", owner, repo)
PY
)
API_BASE="${BASE_URL}/api/v1/repos/${OWNER}/${REPO}"
ASSET_NAME="$(basename "${ASSET_PATH}")"
if [[ "${DRY_RUN}" == "1" ]]; then
echo "[DRY-RUN] API: ${API_BASE}"
echo "[DRY-RUN] Tag: ${TAG}"
echo "[DRY-RUN] Asset: ${ASSET_PATH}"
exit 0
fi
if [[ -z "${GITEA_TOKEN:-}" ]]; then
echo "Bitte GITEA_TOKEN setzen." >&2
exit 1
fi
tmp_json="$(mktemp)"
tmp_http="$(mktemp)"
trap 'rm -f "${tmp_json}" "${tmp_http}"' EXIT
urlenc() {
python3 - "$1" <<'PY'
import sys
from urllib.parse import quote
print(quote(sys.argv[1], safe=""))
PY
}
tag_enc="$(urlenc "${TAG}")"
auth_header="Authorization: token ${GITEA_TOKEN}"
http_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/tags/${tag_enc}")"
if [[ "${http_code}" == "200" ]]; then
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
elif [[ "${http_code}" == "404" ]]; then
payload="$(python3 - "${TAG}" "${TITLE}" "${NOTES}" <<'PY'
import json,sys
print(json.dumps({
"tag_name": sys.argv[1],
"name": sys.argv[2],
"body": sys.argv[3],
"draft": False,
"prerelease": False
}))
PY
)"
http_code_create="$(curl -sS -X POST -H "${auth_header}" -H "Content-Type: application/json" -d "${payload}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases")"
if [[ "${http_code_create}" != "201" ]]; then
echo "Release konnte nicht erstellt werden (HTTP ${http_code_create})." >&2
cat "${tmp_json}" >&2
exit 1
fi
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
else
echo "Release-Abfrage fehlgeschlagen (HTTP ${http_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
assets_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets")"
if [[ "${assets_code}" == "200" ]]; then
EXISTING_ASSET_ID="$(python3 - "${tmp_json}" "${ASSET_NAME}" <<'PY'
import json,sys
assets=json.load(open(sys.argv[1], encoding="utf-8"))
name=sys.argv[2]
for a in assets:
if a.get("name")==name:
print(a.get("id"))
break
PY
)"
if [[ -n "${EXISTING_ASSET_ID}" ]]; then
del_code="$(curl -sS -X DELETE -H "${auth_header}" -o "${tmp_http}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets/${EXISTING_ASSET_ID}")"
if [[ "${del_code}" != "204" ]]; then
echo "Altes Asset konnte nicht geloescht werden (HTTP ${del_code})." >&2
cat "${tmp_http}" >&2
exit 1
fi
fi
fi
asset_name_enc="$(urlenc "${ASSET_NAME}")"
upload_code="$(curl -sS -X POST -H "${auth_header}" -F "attachment=@${ASSET_PATH}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets?name=${asset_name_enc}")"
if [[ "${upload_code}" != "201" ]]; then
echo "Asset-Upload fehlgeschlagen (HTTP ${upload_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
echo "Release-Asset hochgeladen:"
echo " Repo: ${OWNER}/${REPO}"
echo " Tag: ${TAG}"
echo " Asset: ${ASSET_NAME}"
echo " URL: ${BASE_URL}/${OWNER}/${REPO}/releases/tag/${TAG}"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
HOST="${HOST:-127.0.0.1}"
PORT="${PORT:-8080}"
if [[ ! -f "${DIST_DIR}/repo/addons.xml" ]]; then
echo "Missing ${DIST_DIR}/repo/addons.xml" >&2
echo "Run ./scripts/build_local_kodi_repo.sh first." >&2
exit 1
fi
echo "Serving local Kodi repo from ${DIST_DIR}"
echo "Repository URL: http://${HOST}:${PORT}/repo/addons.xml"
(cd "${DIST_DIR}" && python3 -m http.server "${PORT}" --bind "${HOST}")

73
scripts/zip_deterministic.py Executable file
View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""Create deterministic zip archives.
Usage:
zip_deterministic.py <zip_path> <root_dir>
The archive will include the root directory itself and all files under it.
"""
from __future__ import annotations
import os
import sys
import time
import zipfile
from pathlib import Path
def _timestamp() -> tuple[int, int, int, int, int, int]:
epoch = os.environ.get("SOURCE_DATE_EPOCH")
if epoch:
try:
value = int(epoch)
return time.gmtime(value)[:6]
except Exception:
pass
return (2000, 1, 1, 0, 0, 0)
def _iter_files(root: Path):
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = sorted([d for d in dirnames if d != "__pycache__"])
for filename in sorted(filenames):
if filename.endswith(".pyc"):
continue
yield Path(dirpath) / filename
def _add_file(zf: zipfile.ZipFile, file_path: Path, arcname: str) -> None:
info = zipfile.ZipInfo(arcname, date_time=_timestamp())
info.compress_type = zipfile.ZIP_DEFLATED
info.external_attr = (0o644 & 0xFFFF) << 16
with file_path.open("rb") as handle:
data = handle.read()
zf.writestr(info, data, compress_type=zipfile.ZIP_DEFLATED)
def main() -> int:
if len(sys.argv) != 3:
print("Usage: zip_deterministic.py <zip_path> <root_dir>")
return 2
zip_path = Path(sys.argv[1]).resolve()
root = Path(sys.argv[2]).resolve()
if not root.exists() or not root.is_dir():
print(f"Missing root dir: {root}")
return 2
base = root.parent
zip_path.parent.mkdir(parents=True, exist_ok=True)
if zip_path.exists():
zip_path.unlink()
with zipfile.ZipFile(zip_path, "w") as zf:
for file_path in sorted(_iter_files(root)):
arcname = str(file_path.relative_to(base)).replace(os.sep, "/")
_add_file(zf, file_path, arcname)
print(str(zip_path))
return 0
if __name__ == "__main__":
raise SystemExit(main())