Compare commits

...

8 Commits

21 changed files with 1830 additions and 61 deletions

BIN
.coverage

Binary file not shown.

6
.gitignore vendored
View File

@@ -9,5 +9,11 @@
# Local tests (not committed) # Local tests (not committed)
/tests/ /tests/
/TESTING/
/.pytest_cache/ /.pytest_cache/
/pytest.ini /pytest.ini
# Python artifacts
__pycache__/
*.pyc
.coverage

View File

@@ -15,6 +15,18 @@ ViewIT ist ein KodiAddon zum Durchsuchen und Abspielen von Inhalten der unter
- KodiZIP bauen: `./scripts/build_kodi_zip.sh``dist/<addon_id>-<version>.zip` - KodiZIP bauen: `./scripts/build_kodi_zip.sh``dist/<addon_id>-<version>.zip`
- AddonVersion in `addon/addon.xml` - AddonVersion in `addon/addon.xml`
## Lokales Kodi-Repository
- Repository bauen (inkl. ZIPs + `addons.xml` + `addons.xml.md5`): `./scripts/build_local_kodi_repo.sh`
- Lokal bereitstellen: `./scripts/serve_local_kodi_repo.sh`
- Standard-URL: `http://127.0.0.1:8080/repo/addons.xml`
- Optional eigene URL beim Build setzen: `REPO_BASE_URL=http://<host>:<port>/repo ./scripts/build_local_kodi_repo.sh`
## Gitea Release-Asset Upload
- ZIP bauen: `./scripts/build_kodi_zip.sh`
- Token setzen: `export GITEA_TOKEN=<token>`
- Asset an Tag hochladen (erstellt Release bei Bedarf): `./scripts/publish_gitea_release.sh`
- Optional: `--tag v0.1.50 --asset dist/plugin.video.viewit-0.1.50.zip`
## Entwicklung (kurz) ## Entwicklung (kurz)
- Hauptlogik: `addon/default.py` - Hauptlogik: `addon/default.py`
- Plugins: `addon/plugins/*_plugin.py` - Plugins: `addon/plugins/*_plugin.py`

View File

@@ -16,6 +16,7 @@ import json
import os import os
import re import re
import sys import sys
import xml.etree.ElementTree as ET
from pathlib import Path from pathlib import Path
from types import ModuleType from types import ModuleType
from urllib.parse import parse_qs, urlencode from urllib.parse import parse_qs, urlencode
@@ -401,6 +402,27 @@ def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return default return default
def _set_setting_string(setting_id: str, value: str) -> None:
if xbmcaddon is None:
return
addon = _get_addon()
if addon is None:
return
setter = getattr(addon, "setSettingString", None)
if callable(setter):
try:
setter(setting_id, str(value))
return
except TypeError:
return
setter = getattr(addon, "setSetting", None)
if callable(setter):
try:
setter(setting_id, str(value))
except TypeError:
return
def _apply_video_info(item, info_labels: dict[str, object] | None, cast: list[TmdbCastMember] | None) -> None: def _apply_video_info(item, info_labels: dict[str, object] | None, cast: list[TmdbCastMember] | None) -> None:
"""Setzt Metadaten bevorzugt via InfoTagVideo (Kodi v20+), mit Fallback auf deprecated APIs.""" """Setzt Metadaten bevorzugt via InfoTagVideo (Kodi v20+), mit Fallback auf deprecated APIs."""
@@ -883,6 +905,136 @@ def _add_directory_item(
xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder) xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder)
def _plugin_version(plugin: BasisPlugin) -> str:
raw = getattr(plugin, "version", "0.0.0")
text = str(raw or "").strip()
return text or "0.0.0"
def _normalize_update_info_url(raw: str) -> str:
value = str(raw or "").strip()
default = "http://127.0.0.1:8080/repo/addons.xml"
if not value:
return default
if value.endswith("/addons.xml"):
return value
return value.rstrip("/") + "/addons.xml"
def _repo_addon_xml_path() -> str:
if xbmcvfs is None:
return ""
try:
return xbmcvfs.translatePath("special://home/addons/repository.viewit/addon.xml")
except Exception:
return ""
def _update_repository_source(info_url: str) -> bool:
path = _repo_addon_xml_path()
if not path:
return False
if not os.path.exists(path):
return False
try:
tree = ET.parse(path)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
return False
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
return False
base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/")
info.text = info_url
checksum.text = f"{base}/addons.xml.md5"
datadir.text = f"{base}/"
tree.write(path, encoding="utf-8", xml_declaration=True)
return True
except Exception as exc:
_log(f"Repository-URL konnte nicht gesetzt werden: {exc}", xbmc.LOGWARNING)
return False
def _settings_key_for_plugin(name: str) -> str:
safe = re.sub(r"[^a-z0-9]+", "_", (name or "").strip().casefold()).strip("_")
return f"update_version_{safe}" if safe else "update_version_unknown"
def _collect_plugin_metadata(plugin: BasisPlugin, titles: list[str]) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]]:
getter = getattr(plugin, "metadata_for", None)
if not callable(getter):
return {}
collected: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]] = {}
for title in titles:
try:
labels, art, cast = getter(title)
except Exception:
continue
if isinstance(labels, dict) or isinstance(art, dict) or cast:
label_map = {str(k): str(v) for k, v in dict(labels or {}).items() if v}
art_map = {str(k): str(v) for k, v in dict(art or {}).items() if v}
collected[title] = (label_map, art_map, cast if isinstance(cast, list) else None)
return collected
def _needs_tmdb(labels: dict[str, str], art: dict[str, str], *, want_plot: bool, want_art: bool) -> bool:
if want_plot and not labels.get("plot"):
return True
if want_art and not (art.get("thumb") or art.get("poster") or art.get("fanart") or art.get("landscape")):
return True
return False
def _merge_metadata(
title: str,
tmdb_labels: dict[str, str] | None,
tmdb_art: dict[str, str] | None,
tmdb_cast: list[TmdbCastMember] | None,
plugin_meta: tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None] | None,
) -> tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]:
labels = dict(tmdb_labels or {})
art = dict(tmdb_art or {})
cast = tmdb_cast
if plugin_meta is not None:
meta_labels, meta_art, meta_cast = plugin_meta
labels.update({k: str(v) for k, v in dict(meta_labels or {}).items() if v})
art.update({k: str(v) for k, v in dict(meta_art or {}).items() if v})
if meta_cast is not None:
cast = meta_cast
if "title" not in labels:
labels["title"] = title
return labels, art, cast
def _sync_update_version_settings() -> None:
addon = _get_addon()
addon_version = "0.0.0"
if addon is not None:
try:
addon_version = str(addon.getAddonInfo("version") or "0.0.0")
except Exception:
addon_version = "0.0.0"
_set_setting_string("update_version_addon", addon_version)
versions = {
"update_version_serienstream": "-",
"update_version_aniworld": "-",
"update_version_einschalten": "-",
"update_version_topstreamfilm": "-",
"update_version_filmpalast": "-",
"update_version_doku_streams": "-",
}
for plugin in _discover_plugins().values():
key = _settings_key_for_plugin(str(plugin.name))
if key in versions:
versions[key] = _plugin_version(plugin)
for key, value in versions.items():
_set_setting_string(key, value)
def _show_root_menu() -> None: def _show_root_menu() -> None:
handle = _get_handle() handle = _get_handle()
_log("Root-Menue wird angezeigt.") _log("Root-Menue wird angezeigt.")
@@ -890,8 +1042,7 @@ def _show_root_menu() -> None:
plugins = _discover_plugins() plugins = _discover_plugins()
for plugin_name in sorted(plugins.keys(), key=lambda value: value.casefold()): for plugin_name in sorted(plugins.keys(), key=lambda value: value.casefold()):
display = f"{plugin_name}" _add_directory_item(handle, plugin_name, "plugin_menu", {"plugin": plugin_name}, is_folder=True)
_add_directory_item(handle, display, "plugin_menu", {"plugin": plugin_name}, is_folder=True)
_add_directory_item(handle, "Einstellungen", "settings") _add_directory_item(handle, "Einstellungen", "settings")
xbmcplugin.endOfDirectory(handle) xbmcplugin.endOfDirectory(handle)
@@ -919,6 +1070,12 @@ def _show_plugin_menu(plugin_name: str) -> None:
if _plugin_has_capability(plugin, "genres"): if _plugin_has_capability(plugin, "genres"):
_add_directory_item(handle, "Genres", "genres", {"plugin": plugin_name}, is_folder=True) _add_directory_item(handle, "Genres", "genres", {"plugin": plugin_name}, is_folder=True)
if _plugin_has_capability(plugin, "alpha"):
_add_directory_item(handle, "A-Z", "alpha_index", {"plugin": plugin_name}, is_folder=True)
if _plugin_has_capability(plugin, "series_catalog"):
_add_directory_item(handle, "Serien", "series_catalog", {"plugin": plugin_name, "page": "1"}, is_folder=True)
if _plugin_has_capability(plugin, "popular_series"): if _plugin_has_capability(plugin, "popular_series"):
_add_directory_item(handle, "Meist gesehen", "popular", {"plugin": plugin_name, "page": "1"}, is_folder=True) _add_directory_item(handle, "Meist gesehen", "popular", {"plugin": plugin_name, "page": "1"}, is_folder=True)
@@ -967,10 +1124,24 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
results = [str(t).strip() for t in (results or []) if t and str(t).strip()] results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
results.sort(key=lambda value: value.casefold()) results.sort(key=lambda value: value.casefold())
plugin_meta = _collect_plugin_metadata(plugin, results)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results and not canceled: show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles and not canceled:
canceled = progress(35, f"{plugin_name} (1/1) Metadaten…") canceled = progress(35, f"{plugin_name} (1/1) Metadaten…")
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results)) tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles))
total_results = max(1, len(results)) total_results = max(1, len(results))
for index, title in enumerate(results, start=1): for index, title in enumerate(results, start=1):
@@ -979,8 +1150,9 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
if index == 1 or index == total_results or (index % 10 == 0): if index == 1 or index == total_results or (index % 10 == 0):
pct = 35 + int((index / float(total_results)) * 60) pct = 35 + int((index / float(total_results)) * 60)
canceled = progress(pct, f"{plugin_name} (1/1) aufbereiten {index}/{total_results}") canceled = progress(pct, f"{plugin_name} (1/1) aufbereiten {index}/{total_results}")
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
info_labels = dict(info_labels or {}) meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow") info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title) info_labels.setdefault("tvshowtitle", title)
@@ -1143,15 +1315,29 @@ def _show_search_results(query: str) -> None:
continue continue
results = [str(t).strip() for t in (results or []) if t and str(t).strip()] results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
_log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG) _log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG)
plugin_meta = _collect_plugin_metadata(plugin, results)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {} tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results: show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
canceled = progress( canceled = progress(
range_start + int((range_end - range_start) * 0.35), range_start + int((range_end - range_start) * 0.35),
f"{plugin_name} ({plugin_index}/{total_plugins}) Metadaten…", f"{plugin_name} ({plugin_index}/{total_plugins}) Metadaten…",
) )
if canceled: if canceled:
break break
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results)) tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles))
total_results = max(1, len(results)) total_results = max(1, len(results))
for title_index, title in enumerate(results, start=1): for title_index, title in enumerate(results, start=1):
if title_index == 1 or title_index == total_results or (title_index % 10 == 0): if title_index == 1 or title_index == total_results or (title_index % 10 == 0):
@@ -1161,8 +1347,9 @@ def _show_search_results(query: str) -> None:
) )
if canceled: if canceled:
break break
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
info_labels = dict(info_labels or {}) meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow") info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title) info_labels.setdefault("tvshowtitle", title)
@@ -1514,6 +1701,176 @@ def _show_genres(plugin_name: str) -> None:
xbmcplugin.endOfDirectory(handle) xbmcplugin.endOfDirectory(handle)
def _show_categories(plugin_name: str) -> None:
handle = _get_handle()
_log(f"Kategorien laden: {plugin_name}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Kategorien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
getter = getattr(plugin, "categories", None)
if not callable(getter):
xbmcgui.Dialog().notification("Kategorien", "Kategorien nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
try:
categories = list(getter() or [])
except Exception as exc:
_log(f"Kategorien konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Kategorien", "Kategorien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
for category in categories:
category = str(category).strip()
if not category:
continue
_add_directory_item(
handle,
category,
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": "1"},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Kategorien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "titles_for_genre_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("Kategorien", "Paging nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "genre_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(category) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"{category} ({page}/{total_pages})")
else:
xbmcplugin.setPluginCategory(handle, f"{category} ({page})")
_set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(category, page) or [])
except Exception as exc:
_log(f"Kategorie-Seite konnte nicht geladen werden ({plugin_name}/{category} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Kategorien", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
if show_tmdb:
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, {}, {}, None, meta)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info(info_labels, playstate),
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(category, page))
except Exception:
show_next = False
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None: def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None:
handle = _get_handle() handle = _get_handle()
plugin = _discover_plugins().get(plugin_name) plugin = _discover_plugins().get(plugin_name)
@@ -1563,6 +1920,157 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
titles = [str(t).strip() for t in titles if t and str(t).strip()] titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold()) titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = show_tmdb and _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(genre, page))
except Exception:
show_next = False
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"genre_titles_page",
{"plugin": plugin_name, "genre": genre, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_alpha_index(plugin_name: str) -> None:
handle = _get_handle()
_log(f"A-Z laden: {plugin_name}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("A-Z", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
getter = getattr(plugin, "alpha_index", None)
if not callable(getter):
xbmcgui.Dialog().notification("A-Z", "A-Z nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
try:
letters = list(getter() or [])
except Exception as exc:
_log(f"A-Z konnte nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("A-Z", "A-Z konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
for letter in letters:
letter = str(letter).strip()
if not letter:
continue
_add_directory_item(
handle,
letter,
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": "1"},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_alpha_titles_page(plugin_name: str, letter: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("A-Z", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "titles_for_alpha_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("A-Z", "Paging nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "alpha_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(letter) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"{letter} ({page}/{total_pages})")
else:
xbmcplugin.setPluginCategory(handle, f"{letter} ({page})")
_set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(letter, page) or [])
except Exception as exc:
_log(f"A-Z Seite konnte nicht geladen werden ({plugin_name}/{letter} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("A-Z", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False) show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles: if titles:
if show_tmdb: if show_tmdb:
@@ -1611,11 +2119,113 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
show_next = False show_next = False
if total_pages is not None: if total_pages is not None:
show_next = page < total_pages show_next = page < total_pages
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_series_catalog(plugin_name: str, page: int = 1) -> None:
handle = _get_handle()
plugin_name = (plugin_name or "").strip()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Serien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "series_catalog_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("Serien", "Serien nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "series_catalog_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(page) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"Serien ({page}/{total_pages})")
else: else:
has_more_getter = getattr(plugin, "genre_has_more", None) xbmcplugin.setPluginCategory(handle, f"Serien ({page})")
_set_content(handle, "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"series_catalog",
{"plugin": plugin_name, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(page) or [])
except Exception as exc:
_log(f"Serien konnten nicht geladen werden ({plugin_name} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Serien", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(titles)
for title in titles:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "series_catalog_has_more", None)
if callable(has_more_getter): if callable(has_more_getter):
try: try:
show_next = bool(has_more_getter(genre, page)) show_next = bool(has_more_getter(page))
except Exception: except Exception:
show_next = False show_next = False
@@ -1623,8 +2233,8 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
_add_directory_item( _add_directory_item(
handle, handle,
"Nächste Seite", "Nächste Seite",
"genre_titles_page", "series_catalog",
{"plugin": plugin_name, "genre": genre, "page": str(page + 1)}, {"plugin": plugin_name, "page": str(page + 1)},
is_folder=True, is_folder=True,
) )
xbmcplugin.endOfDirectory(handle) xbmcplugin.endOfDirectory(handle)
@@ -1802,12 +2412,28 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False) show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page_items: if page_items:
if show_tmdb: plugin_meta = _collect_plugin_metadata(plugin, page_items)
with _busy_dialog(): show_tmdb = show_tmdb and _tmdb_enabled()
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items) show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in page_items: for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title)) meta = plugin_meta.get(title)
info_labels = dict(info_labels or {}) meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow") info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow": if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title) info_labels.setdefault("tvshowtitle", title)
@@ -1825,17 +2451,6 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
art=art, art=art,
cast=cast, cast=cast,
) )
else:
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
if total_pages > 1 and page < total_pages: if total_pages > 1 and page < total_pages:
_add_directory_item( _add_directory_item(
@@ -2245,10 +2860,34 @@ def _open_settings() -> None:
"""Oeffnet das Kodi-Addon-Settings-Dialog.""" """Oeffnet das Kodi-Addon-Settings-Dialog."""
if xbmcaddon is None: # pragma: no cover - outside Kodi if xbmcaddon is None: # pragma: no cover - outside Kodi
raise RuntimeError("xbmcaddon ist nicht verfuegbar (KodiStub).") raise RuntimeError("xbmcaddon ist nicht verfuegbar (KodiStub).")
_sync_update_version_settings()
addon = xbmcaddon.Addon() addon = xbmcaddon.Addon()
addon.openSettings() addon.openSettings()
def _run_update_check() -> None:
"""Stoesst Kodi-Repo- und Addon-Updates an und informiert den Benutzer."""
if xbmc is None: # pragma: no cover - outside Kodi
return
try:
info_url = _normalize_update_info_url(_get_setting_string("update_repo_url"))
_set_setting_string("update_repo_url", info_url)
_sync_update_version_settings()
_update_repository_source(info_url)
builtin = getattr(xbmc, "executebuiltin", None)
if callable(builtin):
builtin("UpdateAddonRepos")
builtin("UpdateLocalAddons")
builtin("ActivateWindow(addonbrowser,addons://updates/)")
xbmcgui.Dialog().notification("ViewIT Update", "Update-Pruefung gestartet.", xbmcgui.NOTIFICATION_INFO, 4000)
except Exception as exc:
_log(f"Update-Pruefung fehlgeschlagen: {exc}", xbmc.LOGWARNING)
try:
xbmcgui.Dialog().notification("ViewIT Update", "Update-Pruefung fehlgeschlagen.", xbmcgui.NOTIFICATION_ERROR, 4000)
except Exception:
pass
def _extract_first_int(value: str) -> int | None: def _extract_first_int(value: str) -> int | None:
match = re.search(r"(\d+)", value or "") match = re.search(r"(\d+)", value or "")
if not match: if not match:
@@ -2564,6 +3203,8 @@ def run() -> None:
_show_genre_sources() _show_genre_sources()
elif action == "genres": elif action == "genres":
_show_genres(params.get("plugin", "")) _show_genres(params.get("plugin", ""))
elif action == "categories":
_show_categories(params.get("plugin", ""))
elif action == "new_titles": elif action == "new_titles":
_show_new_titles( _show_new_titles(
params.get("plugin", ""), params.get("plugin", ""),
@@ -2585,6 +3226,25 @@ def run() -> None:
params.get("genre", ""), params.get("genre", ""),
_parse_positive_int(params.get("page", "1"), default=1), _parse_positive_int(params.get("page", "1"), default=1),
) )
elif action == "category_titles_page":
_show_category_titles_page(
params.get("plugin", ""),
params.get("category", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "alpha_index":
_show_alpha_index(params.get("plugin", ""))
elif action == "alpha_titles_page":
_show_alpha_titles_page(
params.get("plugin", ""),
params.get("letter", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "series_catalog":
_show_series_catalog(
params.get("plugin", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "genre_series_group": elif action == "genre_series_group":
_show_genre_series_group( _show_genre_series_group(
params.get("plugin", ""), params.get("plugin", ""),
@@ -2599,6 +3259,8 @@ def run() -> None:
) )
elif action == "settings": elif action == "settings":
_open_settings() _open_settings()
elif action == "check_updates":
_run_update_check()
elif action == "seasons": elif action == "seasons":
_show_seasons(params.get("plugin", ""), params.get("title", ""), params.get("series_url", "")) _show_seasons(params.get("plugin", ""), params.get("title", ""), params.get("series_url", ""))
elif action == "episodes": elif action == "episodes":

View File

@@ -11,6 +11,7 @@ class BasisPlugin(ABC):
"""Abstrakte Basisklasse fuer alle Integrationen.""" """Abstrakte Basisklasse fuer alle Integrationen."""
name: str name: str
version: str = "0.0.0"
@abstractmethod @abstractmethod
async def search_titles(self, query: str) -> List[str]: async def search_titles(self, query: str) -> List[str]:

View File

@@ -691,6 +691,7 @@ def search_animes(query: str) -> List[SeriesResult]:
class AniworldPlugin(BasisPlugin): class AniworldPlugin(BasisPlugin):
name = "Aniworld" name = "Aniworld"
version = "1.0.0"
def __init__(self) -> None: def __init__(self) -> None:
self._anime_results: Dict[str, SeriesResult] = {} self._anime_results: Dict[str, SeriesResult] = {}

View File

@@ -0,0 +1,476 @@
"""Doku-Streams (doku-streams.com) Integration."""
from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "doku_streams_base_url"
DEFAULT_BASE_URL = "https://doku-streams.com"
MOST_VIEWED_PATH = "/meistgesehene/"
DEFAULT_TIMEOUT = 20
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_dokustreams"
SETTING_DUMP_HTML = "dump_html_dokustreams"
SETTING_SHOW_URL_INFO = "show_url_info_dokustreams"
SETTING_LOG_ERRORS = "log_errors_dokustreams"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass(frozen=True)
class SearchHit:
title: str
url: str
plot: str = ""
poster: str = ""
def _extract_last_page(soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("nav.navigation a[href], nav.pagination a[href], a.page-numbers[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"/page/(\\d+)/", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
for value in re.findall(r"(\\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def _extract_summary_and_poster(article: BeautifulSoupT) -> tuple[str, str]:
summary = ""
if article:
summary_box = article.select_one("div.entry-summary")
if summary_box is not None:
for p in summary_box.find_all("p"):
text = (p.get_text(" ", strip=True) or "").strip()
if text:
summary = text
break
poster = ""
if article:
img = article.select_one("div.entry-thumb img")
if img is not None:
poster = (img.get("data-src") or "").strip() or (img.get("src") or "").strip()
if "lazy_placeholder" in poster and img.get("data-src"):
poster = (img.get("data-src") or "").strip()
poster = _absolute_url(poster)
return summary, poster
def _parse_listing_hits(soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
for article in soup.select("article[id^='post-']"):
anchor = article.select_one("h2.entry-title a[href]")
if anchor is None:
continue
href = (anchor.get("href") or "").strip()
title = (anchor.get_text(" ", strip=True) or "").strip()
if not href or not title:
continue
if query and not _matches_query(query, title=title):
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
summary, poster = _extract_summary_and_poster(article)
hits.append(SearchHit(title=title, url=url, plot=summary, poster=poster))
return hits
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _absolute_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if url.startswith("http://") or url.startswith("https://"):
return url
if url.startswith("//"):
return f"https:{url}"
if url.startswith("/"):
return f"{_get_base_url()}{url}"
return f"{_get_base_url()}/{url.lstrip('/')}"
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="dokustreams_urls.log",
url=url,
kind=kind,
)
def _log_visit(url: str) -> None:
_log_url_event(url, kind="VISIT")
notify_url(
ADDON_ID,
heading="Doku-Streams",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="dokustreams_response",
)
def _log_error_message(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="dokustreams_errors.log",
message=message,
)
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
_log_visit(url)
sess = session or get_requests_session("dokustreams", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error_message(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
return BeautifulSoup(response.text, "html.parser")
class DokuStreamsPlugin(BasisPlugin):
name = "Doku-Streams"
version = "1.0.0"
prefer_source_metadata = True
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._category_to_url: Dict[str, str] = {}
self._category_page_count_cache: Dict[str, int] = {}
self._popular_cache: Optional[List[SearchHit]] = None
self._title_meta: Dict[str, tuple[str, str]] = {}
self._requests_available = REQUESTS_AVAILABLE
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"DokuStreamsPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
self._title_to_url = {hit.title: hit.url for hit in hits if hit.title and hit.url}
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def _search_hits(self, query: str) -> List[SearchHit]:
query = (query or "").strip()
if not query or not self._requests_available:
return []
search_url = _absolute_url(f"/?s={quote(query)}")
session = get_requests_session("dokustreams", headers=HEADERS)
try:
soup = _get_soup(search_url, session=session)
except Exception:
return []
return _parse_listing_hits(soup, query=query)
def capabilities(self) -> set[str]:
return {"genres", "popular_series"}
def _categories_url(self) -> str:
return _absolute_url("/kategorien/")
def _parse_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
def clean_name(value: str) -> str:
value = (value or "").strip()
return re.sub(r"\\s*\\(\\d+\\)\\s*$", "", value).strip()
def walk(ul, parents: List[str]) -> None:
for li in ul.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = clean_name(anchor.get_text(" ", strip=True) or "")
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
child_ul = li.find("ul", class_="nested-category-list")
if child_ul is not None:
walk(child_ul, parents + [name])
else:
if parents:
label = " \u2192 ".join(parents + [name])
categories[label] = _absolute_url(href)
walk(root, [])
return categories
def _parse_top_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
for li in root.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
categories[name] = _absolute_url(href)
return categories
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._category_to_url:
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_categories(soup)
if parsed:
self._category_to_url = dict(parsed)
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
def categories(self) -> List[str]:
if not self._requests_available:
return []
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_top_categories(soup)
if parsed:
for key, value in parsed.items():
self._category_to_url.setdefault(key, value)
return list(parsed.keys())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._category_page_count_cache:
return max(1, int(self._category_page_count_cache.get(genre, 1)))
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return 1
pages = _extract_last_page(soup)
self._category_page_count_cache[genre] = max(1, pages)
return self._category_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else f"{base_url.rstrip('/')}/page/{page}/"
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
return titles
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _most_viewed_url(self) -> str:
return _absolute_url(MOST_VIEWED_PATH)
def popular_series(self) -> List[str]:
if not self._requests_available:
return []
if self._popular_cache is not None:
titles = [hit.title for hit in self._popular_cache if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
try:
soup = _get_soup(self._most_viewed_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
self._popular_cache = list(hits)
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
plot, poster = self._title_meta.get(title, ("", ""))
info: dict[str, str] = {"title": title}
if plot:
info["plot"] = plot
art: dict[str, str] = {}
if poster:
art = {"thumb": poster, "poster": poster}
return info, art, None
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return ["Stream"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return [title]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
if not title:
return None
url = self._title_to_url.get(title)
if not url:
return None
if not self._requests_available:
return None
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return None
iframe = soup.select_one("div.fluid-width-video-wrapper iframe[src]")
if iframe is None:
iframe = soup.select_one("iframe[src*='youtube'], iframe[src*='vimeo'], iframe[src]")
if iframe is None:
return None
src = (iframe.get("src") or "").strip()
if not src:
return None
return _absolute_url(src)
# Alias für die automatische Plugin-Erkennung.
Plugin = DokuStreamsPlugin

View File

@@ -507,6 +507,7 @@ class EinschaltenPlugin(BasisPlugin):
"""Metadata-Plugin für eine autorisierte Quelle.""" """Metadata-Plugin für eine autorisierte Quelle."""
name = "Einschalten" name = "Einschalten"
version = "1.0.0"
def __init__(self) -> None: def __init__(self) -> None:
self.is_available = REQUESTS_AVAILABLE self.is_available = REQUESTS_AVAILABLE

View File

@@ -10,6 +10,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
import re import re
from urllib.parse import quote, urlencode from urllib.parse import quote, urlencode
from urllib.parse import urljoin
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency try: # pragma: no cover - optional dependency
@@ -42,6 +43,7 @@ DEFAULT_BASE_URL = "https://filmpalast.to"
DEFAULT_TIMEOUT = 20 DEFAULT_TIMEOUT = 20
DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"] DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"]
SERIES_HINT_PREFIX = "series://filmpalast/" SERIES_HINT_PREFIX = "series://filmpalast/"
SERIES_VIEW_PATH = "/serien/view"
SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE) SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE)
GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
@@ -218,11 +220,17 @@ def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> Beautif
class FilmpalastPlugin(BasisPlugin): class FilmpalastPlugin(BasisPlugin):
name = "Filmpalast" name = "Filmpalast"
version = "1.0.0"
def __init__(self) -> None: def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {} self._title_to_url: Dict[str, str] = {}
self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {} self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {}
self._hoster_cache: Dict[str, Dict[str, str]] = {} self._hoster_cache: Dict[str, Dict[str, str]] = {}
self._genre_to_url: Dict[str, str] = {}
self._genre_page_count_cache: Dict[str, int] = {}
self._alpha_to_url: Dict[str, str] = {}
self._alpha_page_count_cache: Dict[str, int] = {}
self._series_page_count_cache: Dict[int, int] = {}
self._requests_available = REQUESTS_AVAILABLE self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters) self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
@@ -391,8 +399,41 @@ class FilmpalastPlugin(BasisPlugin):
return hits return hits
async def search_titles(self, query: str) -> List[str]: def _parse_listing_hits(self, soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits = self._search_hits(query) hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
if not anchors:
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if not _is_probably_content_url(url):
continue
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
if not title:
continue
if title.casefold() in {"details/play", "play", "details"}:
continue
if query and not _matches_query(query, title=title):
continue
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
hits.append(SearchHit(title=title, url=url))
return hits
def _apply_hits_to_title_index(self, hits: List[SearchHit]) -> List[str]:
self._title_to_url = {} self._title_to_url = {}
self._series_entries = {} self._series_entries = {}
self._hoster_cache.clear() self._hoster_cache.clear()
@@ -425,6 +466,208 @@ class FilmpalastPlugin(BasisPlugin):
titles.sort(key=lambda value: value.casefold()) titles.sort(key=lambda value: value.casefold())
return titles return titles
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
return self._apply_hits_to_title_index(hits)
def _parse_genres(self, soup: BeautifulSoupT) -> Dict[str, str]:
genres: Dict[str, str] = {}
if not soup:
return genres
for anchor in soup.select("section#genre a[href], #genre a[href], aside #genre a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/genre/" not in href:
continue
genres[name] = _absolute_url(href)
return genres
def _extract_last_page(self, soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("#paging a[href], .paging a[href], a.pageing[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"(\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def capabilities(self) -> set[str]:
return {"genres", "alpha", "series_catalog"}
def _parse_alpha_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
alpha: Dict[str, str] = {}
if not soup:
return alpha
for anchor in soup.select("section#movietitle a[href], #movietitle a[href], aside #movietitle a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/alpha/" not in href:
continue
if name in alpha:
continue
alpha[name] = _absolute_url(href)
return alpha
def alpha_index(self) -> List[str]:
if not self._requests_available:
return []
if self._alpha_to_url:
return list(self._alpha_to_url.keys())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_alpha_links(soup)
if parsed:
self._alpha_to_url = dict(parsed)
return list(self._alpha_to_url.keys())
def alpha_page_count(self, letter: str) -> int:
letter = (letter or "").strip()
if not letter:
return 1
if letter in self._alpha_page_count_cache:
return max(1, int(self._alpha_page_count_cache.get(letter, 1)))
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._alpha_page_count_cache[letter] = max(1, pages)
return self._alpha_page_count_cache[letter]
def titles_for_alpha_page(self, letter: str, page: int) -> List[str]:
letter = (letter or "").strip()
if not letter or not self._requests_available:
return []
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_alpha(self, letter: str) -> List[str]:
titles = self.titles_for_alpha_page(letter, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _series_view_url(self) -> str:
return _absolute_url(SERIES_VIEW_PATH)
def series_catalog_page_count(self, page: int = 1) -> int:
if not self._requests_available:
return 1
cache_key = int(page or 1)
if cache_key in self._series_page_count_cache:
return max(1, int(self._series_page_count_cache.get(cache_key, 1)))
base_url = self._series_view_url()
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._series_page_count_cache[cache_key] = max(1, pages)
return self._series_page_count_cache[cache_key]
def series_catalog_page(self, page: int) -> List[str]:
if not self._requests_available:
return []
base_url = self._series_view_url()
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def series_catalog_has_more(self, page: int) -> bool:
total = self.series_catalog_page_count(page)
return page < total
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._genre_to_url:
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_genres(soup)
if parsed:
self._genre_to_url = dict(parsed)
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._genre_page_count_cache:
return max(1, int(self._genre_page_count_cache.get(genre, 1)))
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._genre_page_count_cache[genre] = max(1, pages)
return self._genre_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _ensure_title_url(self, title: str) -> str: def _ensure_title_url(self, title: str) -> str:
title = (title or "").strip() title = (title or "").strip()
if not title: if not title:
@@ -643,26 +886,39 @@ class FilmpalastPlugin(BasisPlugin):
def resolve_stream_link(self, link: str) -> Optional[str]: def resolve_stream_link(self, link: str) -> Optional[str]:
if not link: if not link:
return None return None
resolved = link try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
# 1) Immer zuerst den ursprünglichen Hoster-Link an ResolveURL geben.
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(link)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
redirected = link
if self._requests_available: if self._requests_available:
try: try:
session = get_requests_session("filmpalast", headers=HEADERS) session = get_requests_session("filmpalast", headers=HEADERS)
response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True) response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
response.raise_for_status() response.raise_for_status()
resolved = (response.url or link).strip() or link redirected = (response.url or link).strip() or link
except Exception: except Exception:
resolved = link redirected = link
try:
from resolveurl_backend import resolve as resolve_with_resolveurl # 2) Danach optional die Redirect-URL nochmals auflösen.
except Exception: if callable(resolve_with_resolveurl) and redirected and redirected != link:
resolve_with_resolveurl = None resolved_by_resolveurl = resolve_with_resolveurl(redirected)
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(resolved)
if resolved_by_resolveurl: if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER") _log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA") _log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl return resolved_by_resolveurl
if resolved:
_log_url_event(resolved, kind="FINAL") # 3) Fallback bleibt wie bisher: direkte URL zurückgeben.
return resolved if redirected:
_log_url_event(redirected, kind="FINAL")
return redirected
return None return None

View File

@@ -784,6 +784,7 @@ class SerienstreamPlugin(BasisPlugin):
"""Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt.""" """Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt."""
name = "Serienstream" name = "Serienstream"
version = "1.0.0"
POPULAR_GENRE_LABEL = "⭐ Beliebte Serien" POPULAR_GENRE_LABEL = "⭐ Beliebte Serien"
def __init__(self) -> None: def __init__(self) -> None:

View File

@@ -123,6 +123,7 @@ class TopstreamfilmPlugin(BasisPlugin):
"""Integration fuer eine HTML-basierte Suchseite.""" """Integration fuer eine HTML-basierte Suchseite."""
name = "Topstreamfilm" name = "Topstreamfilm"
version = "1.0.0"
def __init__(self) -> None: def __init__(self) -> None:
self._session: RequestsSession | None = None self._session: RequestsSession | None = None

View File

@@ -45,6 +45,9 @@
<category label="Filmpalast"> <category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" /> <setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" />
</category> </category>
<category label="Doku-Streams">
<setting id="doku_streams_base_url" type="text" label="Domain (BASE_URL)" default="https://doku-streams.com" />
</category>
<category label="TMDB"> <category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" /> <setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />
<setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" /> <setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" />
@@ -61,4 +64,16 @@
<setting id="tmdb_log_requests" type="bool" label="TMDB API Requests loggen" default="false" /> <setting id="tmdb_log_requests" type="bool" label="TMDB API Requests loggen" default="false" />
<setting id="tmdb_log_responses" type="bool" label="TMDB API Antworten loggen" default="false" /> <setting id="tmdb_log_responses" type="bool" label="TMDB API Antworten loggen" default="false" />
</category> </category>
<category label="Update">
<setting id="update_repo_url" type="text" label="Update-URL (addons.xml)" default="http://127.0.0.1:8080/repo/addons.xml" />
<setting id="run_update_check" type="action" label="Jetzt auf Updates pruefen" action="RunPlugin(plugin://plugin.video.viewit/?action=check_updates)" option="close" />
<setting id="update_info" type="text" label="Kodi-Repository-Updates werden ueber den Kodi-Update-Mechanismus verarbeitet." default="" enable="false" />
<setting id="update_version_addon" type="text" label="ViewIT Addon Version" default="-" enable="false" />
<setting id="update_version_serienstream" type="text" label="Serienstream Plugin Version" default="-" enable="false" />
<setting id="update_version_aniworld" type="text" label="Aniworld Plugin Version" default="-" enable="false" />
<setting id="update_version_einschalten" type="text" label="Einschalten Plugin Version" default="-" enable="false" />
<setting id="update_version_topstreamfilm" type="text" label="Topstreamfilm Plugin Version" default="-" enable="false" />
<setting id="update_version_filmpalast" type="text" label="Filmpalast Plugin Version" default="-" enable="false" />
<setting id="update_version_doku_streams" type="text" label="Doku-Streams Plugin Version" default="-" enable="false" />
</category>
</settings> </settings>

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="repository.viewit" name="ViewIT Repository" version="1.0.0" provider-name="ViewIT">
<extension point="xbmc.addon.repository" name="ViewIT Repository">
<dir>
<info compressed="false">http://127.0.0.1:8080/repo/addons.xml</info>
<checksum>http://127.0.0.1:8080/repo/addons.xml.md5</checksum>
<datadir zip="true">http://127.0.0.1:8080/repo/</datadir>
</dir>
</extension>
<extension point="xbmc.addon.metadata">
<summary lang="de_DE">Lokales Repository fuer ViewIT Updates</summary>
<summary lang="en_GB">Local repository for ViewIT updates</summary>
<description lang="de_DE">Stellt das ViewIT Addon ueber ein Kodi Repository bereit.</description>
<description lang="en_GB">Provides the ViewIT addon via a Kodi repository.</description>
<platform>all</platform>
</extension>
</addon>

110
scripts/build_local_kodi_repo.sh Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
REPO_DIR="${DIST_DIR}/repo"
PLUGIN_ADDON_XML="${ROOT_DIR}/addon/addon.xml"
REPO_SRC_DIR="${ROOT_DIR}/repository.viewit"
REPO_ADDON_XML="${REPO_SRC_DIR}/addon.xml"
REPO_BASE_URL="${REPO_BASE_URL:-http://127.0.0.1:8080/repo}"
if [[ ! -f "${PLUGIN_ADDON_XML}" ]]; then
echo "Missing: ${PLUGIN_ADDON_XML}" >&2
exit 1
fi
if [[ ! -f "${REPO_ADDON_XML}" ]]; then
echo "Missing: ${REPO_ADDON_XML}" >&2
exit 1
fi
mkdir -p "${REPO_DIR}"
PLUGIN_ZIP="$("${ROOT_DIR}/scripts/build_kodi_zip.sh")"
cp -f "${PLUGIN_ZIP}" "${REPO_DIR}/"
read -r REPO_ADDON_ID REPO_ADDON_VERSION < <(python3 - "${REPO_ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "repository.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
TMP_DIR="$(mktemp -d)"
trap 'rm -rf "${TMP_DIR}"' EXIT
TMP_REPO_ADDON_DIR="${TMP_DIR}/${REPO_ADDON_ID}"
mkdir -p "${TMP_REPO_ADDON_DIR}"
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete "${REPO_SRC_DIR}/" "${TMP_REPO_ADDON_DIR}/"
else
cp -a "${REPO_SRC_DIR}/." "${TMP_REPO_ADDON_DIR}/"
fi
python3 - "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_BASE_URL}" <<'PY'
import sys
import xml.etree.ElementTree as ET
addon_xml = sys.argv[1]
base_url = sys.argv[2].rstrip("/")
tree = ET.parse(addon_xml)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
raise SystemExit("Invalid repository addon.xml: missing <dir>")
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
raise SystemExit("Invalid repository addon.xml: missing info/checksum/datadir")
info.text = f"{base_url}/addons.xml"
checksum.text = f"{base_url}/addons.xml.md5"
datadir.text = f"{base_url}/"
tree.write(addon_xml, encoding="utf-8", xml_declaration=True)
PY
REPO_ZIP_NAME="${REPO_ADDON_ID}-${REPO_ADDON_VERSION}.zip"
REPO_ZIP_PATH="${REPO_DIR}/${REPO_ZIP_NAME}"
rm -f "${REPO_ZIP_PATH}"
(cd "${TMP_DIR}" && zip -r "${REPO_ZIP_PATH}" "${REPO_ADDON_ID}" >/dev/null)
python3 - "${PLUGIN_ADDON_XML}" "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_DIR}/addons.xml" <<'PY'
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
plugin_xml = Path(sys.argv[1])
repo_xml = Path(sys.argv[2])
target = Path(sys.argv[3])
addons = ET.Element("addons")
for source in (plugin_xml, repo_xml):
root = ET.parse(source).getroot()
addons.append(root)
target.write_text('<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(addons, encoding="unicode"), encoding="utf-8")
PY
python3 - "${REPO_DIR}/addons.xml" "${REPO_DIR}/addons.xml.md5" <<'PY'
import hashlib
import sys
from pathlib import Path
addons_xml = Path(sys.argv[1])
md5_file = Path(sys.argv[2])
md5 = hashlib.md5(addons_xml.read_bytes()).hexdigest()
md5_file.write_text(md5, encoding="ascii")
PY
echo "Repo built:"
echo " ${REPO_DIR}/addons.xml"
echo " ${REPO_DIR}/addons.xml.md5"
echo " ${REPO_ZIP_PATH}"
echo " ${REPO_DIR}/$(basename "${PLUGIN_ZIP}")"

193
scripts/publish_gitea_release.sh Executable file
View File

@@ -0,0 +1,193 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
ADDON_XML="${ROOT_DIR}/addon/addon.xml"
DEFAULT_NOTES="Automatischer Release-Upload aus ViewIT Build."
TAG=""
ASSET_PATH=""
TITLE=""
NOTES="${DEFAULT_NOTES}"
DRY_RUN="0"
while [[ $# -gt 0 ]]; do
case "$1" in
--tag)
TAG="${2:-}"
shift 2
;;
--asset)
ASSET_PATH="${2:-}"
shift 2
;;
--title)
TITLE="${2:-}"
shift 2
;;
--notes)
NOTES="${2:-}"
shift 2
;;
--dry-run)
DRY_RUN="1"
shift
;;
*)
echo "Unbekanntes Argument: $1" >&2
exit 1
;;
esac
done
if [[ ! -f "${ADDON_XML}" ]]; then
echo "Missing: ${ADDON_XML}" >&2
exit 1
fi
read -r ADDON_ID ADDON_VERSION < <(python3 - "${ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "plugin.video.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
if [[ -z "${TAG}" ]]; then
TAG="v${ADDON_VERSION}"
fi
if [[ -z "${ASSET_PATH}" ]]; then
ASSET_PATH="${ROOT_DIR}/dist/${ADDON_ID}-${ADDON_VERSION}.zip"
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset nicht gefunden, baue ZIP: ${ASSET_PATH}"
"${ROOT_DIR}/scripts/build_kodi_zip.sh" >/dev/null
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset fehlt nach Build: ${ASSET_PATH}" >&2
exit 1
fi
if [[ -z "${TITLE}" ]]; then
TITLE="ViewIT ${TAG}"
fi
REMOTE_URL="$(git -C "${ROOT_DIR}" remote get-url origin)"
read -r BASE_URL OWNER REPO < <(python3 - "${REMOTE_URL}" <<'PY'
import re
import sys
u = sys.argv[1].strip()
m = re.match(r"^https?://([^/]+)/([^/]+)/([^/.]+)(?:\.git)?/?$", u)
if not m:
raise SystemExit("Origin-URL muss https://host/owner/repo(.git) sein.")
host, owner, repo = m.group(1), m.group(2), m.group(3)
print(f"https://{host}", owner, repo)
PY
)
API_BASE="${BASE_URL}/api/v1/repos/${OWNER}/${REPO}"
ASSET_NAME="$(basename "${ASSET_PATH}")"
if [[ "${DRY_RUN}" == "1" ]]; then
echo "[DRY-RUN] API: ${API_BASE}"
echo "[DRY-RUN] Tag: ${TAG}"
echo "[DRY-RUN] Asset: ${ASSET_PATH}"
exit 0
fi
if [[ -z "${GITEA_TOKEN:-}" ]]; then
echo "Bitte GITEA_TOKEN setzen." >&2
exit 1
fi
tmp_json="$(mktemp)"
tmp_http="$(mktemp)"
trap 'rm -f "${tmp_json}" "${tmp_http}"' EXIT
urlenc() {
python3 - "$1" <<'PY'
import sys
from urllib.parse import quote
print(quote(sys.argv[1], safe=""))
PY
}
tag_enc="$(urlenc "${TAG}")"
auth_header="Authorization: token ${GITEA_TOKEN}"
http_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/tags/${tag_enc}")"
if [[ "${http_code}" == "200" ]]; then
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
elif [[ "${http_code}" == "404" ]]; then
payload="$(python3 - "${TAG}" "${TITLE}" "${NOTES}" <<'PY'
import json,sys
print(json.dumps({
"tag_name": sys.argv[1],
"name": sys.argv[2],
"body": sys.argv[3],
"draft": False,
"prerelease": False
}))
PY
)"
http_code_create="$(curl -sS -X POST -H "${auth_header}" -H "Content-Type: application/json" -d "${payload}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases")"
if [[ "${http_code_create}" != "201" ]]; then
echo "Release konnte nicht erstellt werden (HTTP ${http_code_create})." >&2
cat "${tmp_json}" >&2
exit 1
fi
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
else
echo "Release-Abfrage fehlgeschlagen (HTTP ${http_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
assets_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets")"
if [[ "${assets_code}" == "200" ]]; then
EXISTING_ASSET_ID="$(python3 - "${tmp_json}" "${ASSET_NAME}" <<'PY'
import json,sys
assets=json.load(open(sys.argv[1], encoding="utf-8"))
name=sys.argv[2]
for a in assets:
if a.get("name")==name:
print(a.get("id"))
break
PY
)"
if [[ -n "${EXISTING_ASSET_ID}" ]]; then
del_code="$(curl -sS -X DELETE -H "${auth_header}" -o "${tmp_http}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets/${EXISTING_ASSET_ID}")"
if [[ "${del_code}" != "204" ]]; then
echo "Altes Asset konnte nicht geloescht werden (HTTP ${del_code})." >&2
cat "${tmp_http}" >&2
exit 1
fi
fi
fi
asset_name_enc="$(urlenc "${ASSET_NAME}")"
upload_code="$(curl -sS -X POST -H "${auth_header}" -F "attachment=@${ASSET_PATH}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets?name=${asset_name_enc}")"
if [[ "${upload_code}" != "201" ]]; then
echo "Asset-Upload fehlgeschlagen (HTTP ${upload_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
echo "Release-Asset hochgeladen:"
echo " Repo: ${OWNER}/${REPO}"
echo " Tag: ${TAG}"
echo " Asset: ${ASSET_NAME}"
echo " URL: ${BASE_URL}/${OWNER}/${REPO}/releases/tag/${TAG}"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
HOST="${HOST:-127.0.0.1}"
PORT="${PORT:-8080}"
if [[ ! -f "${DIST_DIR}/repo/addons.xml" ]]; then
echo "Missing ${DIST_DIR}/repo/addons.xml" >&2
echo "Run ./scripts/build_local_kodi_repo.sh first." >&2
exit 1
fi
echo "Serving local Kodi repo from ${DIST_DIR}"
echo "Repository URL: http://${HOST}:${PORT}/repo/addons.xml"
(cd "${DIST_DIR}" && python3 -m http.server "${PORT}" --bind "${HOST}")