Compare commits

...

8 Commits

21 changed files with 1830 additions and 61 deletions

BIN
.coverage

Binary file not shown.

6
.gitignore vendored
View File

@@ -9,5 +9,11 @@
# Local tests (not committed)
/tests/
/TESTING/
/.pytest_cache/
/pytest.ini
# Python artifacts
__pycache__/
*.pyc
.coverage

View File

@@ -15,6 +15,18 @@ ViewIT ist ein KodiAddon zum Durchsuchen und Abspielen von Inhalten der unter
- KodiZIP bauen: `./scripts/build_kodi_zip.sh``dist/<addon_id>-<version>.zip`
- AddonVersion in `addon/addon.xml`
## Lokales Kodi-Repository
- Repository bauen (inkl. ZIPs + `addons.xml` + `addons.xml.md5`): `./scripts/build_local_kodi_repo.sh`
- Lokal bereitstellen: `./scripts/serve_local_kodi_repo.sh`
- Standard-URL: `http://127.0.0.1:8080/repo/addons.xml`
- Optional eigene URL beim Build setzen: `REPO_BASE_URL=http://<host>:<port>/repo ./scripts/build_local_kodi_repo.sh`
## Gitea Release-Asset Upload
- ZIP bauen: `./scripts/build_kodi_zip.sh`
- Token setzen: `export GITEA_TOKEN=<token>`
- Asset an Tag hochladen (erstellt Release bei Bedarf): `./scripts/publish_gitea_release.sh`
- Optional: `--tag v0.1.50 --asset dist/plugin.video.viewit-0.1.50.zip`
## Entwicklung (kurz)
- Hauptlogik: `addon/default.py`
- Plugins: `addon/plugins/*_plugin.py`

View File

@@ -16,6 +16,7 @@ import json
import os
import re
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
from types import ModuleType
from urllib.parse import parse_qs, urlencode
@@ -401,6 +402,27 @@ def _get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
return default
def _set_setting_string(setting_id: str, value: str) -> None:
if xbmcaddon is None:
return
addon = _get_addon()
if addon is None:
return
setter = getattr(addon, "setSettingString", None)
if callable(setter):
try:
setter(setting_id, str(value))
return
except TypeError:
return
setter = getattr(addon, "setSetting", None)
if callable(setter):
try:
setter(setting_id, str(value))
except TypeError:
return
def _apply_video_info(item, info_labels: dict[str, object] | None, cast: list[TmdbCastMember] | None) -> None:
"""Setzt Metadaten bevorzugt via InfoTagVideo (Kodi v20+), mit Fallback auf deprecated APIs."""
@@ -883,6 +905,136 @@ def _add_directory_item(
xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder)
def _plugin_version(plugin: BasisPlugin) -> str:
raw = getattr(plugin, "version", "0.0.0")
text = str(raw or "").strip()
return text or "0.0.0"
def _normalize_update_info_url(raw: str) -> str:
value = str(raw or "").strip()
default = "http://127.0.0.1:8080/repo/addons.xml"
if not value:
return default
if value.endswith("/addons.xml"):
return value
return value.rstrip("/") + "/addons.xml"
def _repo_addon_xml_path() -> str:
if xbmcvfs is None:
return ""
try:
return xbmcvfs.translatePath("special://home/addons/repository.viewit/addon.xml")
except Exception:
return ""
def _update_repository_source(info_url: str) -> bool:
path = _repo_addon_xml_path()
if not path:
return False
if not os.path.exists(path):
return False
try:
tree = ET.parse(path)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
return False
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
return False
base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/")
info.text = info_url
checksum.text = f"{base}/addons.xml.md5"
datadir.text = f"{base}/"
tree.write(path, encoding="utf-8", xml_declaration=True)
return True
except Exception as exc:
_log(f"Repository-URL konnte nicht gesetzt werden: {exc}", xbmc.LOGWARNING)
return False
def _settings_key_for_plugin(name: str) -> str:
safe = re.sub(r"[^a-z0-9]+", "_", (name or "").strip().casefold()).strip("_")
return f"update_version_{safe}" if safe else "update_version_unknown"
def _collect_plugin_metadata(plugin: BasisPlugin, titles: list[str]) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]]:
getter = getattr(plugin, "metadata_for", None)
if not callable(getter):
return {}
collected: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]] = {}
for title in titles:
try:
labels, art, cast = getter(title)
except Exception:
continue
if isinstance(labels, dict) or isinstance(art, dict) or cast:
label_map = {str(k): str(v) for k, v in dict(labels or {}).items() if v}
art_map = {str(k): str(v) for k, v in dict(art or {}).items() if v}
collected[title] = (label_map, art_map, cast if isinstance(cast, list) else None)
return collected
def _needs_tmdb(labels: dict[str, str], art: dict[str, str], *, want_plot: bool, want_art: bool) -> bool:
if want_plot and not labels.get("plot"):
return True
if want_art and not (art.get("thumb") or art.get("poster") or art.get("fanart") or art.get("landscape")):
return True
return False
def _merge_metadata(
title: str,
tmdb_labels: dict[str, str] | None,
tmdb_art: dict[str, str] | None,
tmdb_cast: list[TmdbCastMember] | None,
plugin_meta: tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None] | None,
) -> tuple[dict[str, str], dict[str, str], list[TmdbCastMember] | None]:
labels = dict(tmdb_labels or {})
art = dict(tmdb_art or {})
cast = tmdb_cast
if plugin_meta is not None:
meta_labels, meta_art, meta_cast = plugin_meta
labels.update({k: str(v) for k, v in dict(meta_labels or {}).items() if v})
art.update({k: str(v) for k, v in dict(meta_art or {}).items() if v})
if meta_cast is not None:
cast = meta_cast
if "title" not in labels:
labels["title"] = title
return labels, art, cast
def _sync_update_version_settings() -> None:
addon = _get_addon()
addon_version = "0.0.0"
if addon is not None:
try:
addon_version = str(addon.getAddonInfo("version") or "0.0.0")
except Exception:
addon_version = "0.0.0"
_set_setting_string("update_version_addon", addon_version)
versions = {
"update_version_serienstream": "-",
"update_version_aniworld": "-",
"update_version_einschalten": "-",
"update_version_topstreamfilm": "-",
"update_version_filmpalast": "-",
"update_version_doku_streams": "-",
}
for plugin in _discover_plugins().values():
key = _settings_key_for_plugin(str(plugin.name))
if key in versions:
versions[key] = _plugin_version(plugin)
for key, value in versions.items():
_set_setting_string(key, value)
def _show_root_menu() -> None:
handle = _get_handle()
_log("Root-Menue wird angezeigt.")
@@ -890,8 +1042,7 @@ def _show_root_menu() -> None:
plugins = _discover_plugins()
for plugin_name in sorted(plugins.keys(), key=lambda value: value.casefold()):
display = f"{plugin_name}"
_add_directory_item(handle, display, "plugin_menu", {"plugin": plugin_name}, is_folder=True)
_add_directory_item(handle, plugin_name, "plugin_menu", {"plugin": plugin_name}, is_folder=True)
_add_directory_item(handle, "Einstellungen", "settings")
xbmcplugin.endOfDirectory(handle)
@@ -919,6 +1070,12 @@ def _show_plugin_menu(plugin_name: str) -> None:
if _plugin_has_capability(plugin, "genres"):
_add_directory_item(handle, "Genres", "genres", {"plugin": plugin_name}, is_folder=True)
if _plugin_has_capability(plugin, "alpha"):
_add_directory_item(handle, "A-Z", "alpha_index", {"plugin": plugin_name}, is_folder=True)
if _plugin_has_capability(plugin, "series_catalog"):
_add_directory_item(handle, "Serien", "series_catalog", {"plugin": plugin_name, "page": "1"}, is_folder=True)
if _plugin_has_capability(plugin, "popular_series"):
_add_directory_item(handle, "Meist gesehen", "popular", {"plugin": plugin_name, "page": "1"}, is_folder=True)
@@ -967,10 +1124,24 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
results.sort(key=lambda value: value.casefold())
plugin_meta = _collect_plugin_metadata(plugin, results)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results and not canceled:
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles and not canceled:
canceled = progress(35, f"{plugin_name} (1/1) Metadaten…")
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results))
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles))
total_results = max(1, len(results))
for index, title in enumerate(results, start=1):
@@ -979,8 +1150,9 @@ def _show_plugin_search_results(plugin_name: str, query: str) -> None:
if index == 1 or index == total_results or (index % 10 == 0):
pct = 35 + int((index / float(total_results)) * 60)
canceled = progress(pct, f"{plugin_name} (1/1) aufbereiten {index}/{total_results}")
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
@@ -1143,15 +1315,29 @@ def _show_search_results(query: str) -> None:
continue
results = [str(t).strip() for t in (results or []) if t and str(t).strip()]
_log(f"Treffer ({plugin_name}): {len(results)}", xbmc.LOGDEBUG)
plugin_meta = _collect_plugin_metadata(plugin, results)
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
if results:
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_titles = list(results)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in results:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
canceled = progress(
range_start + int((range_end - range_start) * 0.35),
f"{plugin_name} ({plugin_index}/{total_plugins}) Metadaten…",
)
if canceled:
break
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(results))
tmdb_prefetched = _tmdb_labels_and_art_bulk(list(tmdb_titles))
total_results = max(1, len(results))
for title_index, title in enumerate(results, start=1):
if title_index == 1 or title_index == total_results or (title_index % 10 == 0):
@@ -1161,8 +1347,9 @@ def _show_search_results(query: str) -> None:
)
if canceled:
break
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
@@ -1514,6 +1701,176 @@ def _show_genres(plugin_name: str) -> None:
xbmcplugin.endOfDirectory(handle)
def _show_categories(plugin_name: str) -> None:
handle = _get_handle()
_log(f"Kategorien laden: {plugin_name}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Kategorien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
getter = getattr(plugin, "categories", None)
if not callable(getter):
xbmcgui.Dialog().notification("Kategorien", "Kategorien nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
try:
categories = list(getter() or [])
except Exception as exc:
_log(f"Kategorien konnten nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Kategorien", "Kategorien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
for category in categories:
category = str(category).strip()
if not category:
continue
_add_directory_item(
handle,
category,
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": "1"},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_category_titles_page(plugin_name: str, category: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Kategorien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "titles_for_genre_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("Kategorien", "Paging nicht verfuegbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "genre_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(category) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"{category} ({page}/{total_pages})")
else:
xbmcplugin.setPluginCategory(handle, f"{category} ({page})")
_set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(category, page) or [])
except Exception as exc:
_log(f"Kategorie-Seite konnte nicht geladen werden ({plugin_name}/{category} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Kategorien", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
if show_tmdb:
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, {}, {}, None, meta)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=_apply_playstate_to_info(info_labels, playstate),
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(category, page))
except Exception:
show_next = False
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"category_titles_page",
{"plugin": plugin_name, "category": category, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
@@ -1563,6 +1920,157 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
plugin_meta = _collect_plugin_metadata(plugin, titles)
show_tmdb = show_tmdb and _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(titles)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in titles:
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in titles:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, [])) if show_tmdb else ({}, {}, [])
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
direct_play = bool(
plugin_name.casefold() == "einschalten"
and _get_setting_bool("einschalten_enable_playback", default=False)
)
_add_directory_item(
handle,
display_label,
"play_movie" if direct_play else "seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=not direct_play,
info_labels=info_labels,
art=art,
cast=cast,
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(genre, page))
except Exception:
show_next = False
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"genre_titles_page",
{"plugin": plugin_name, "genre": genre, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_alpha_index(plugin_name: str) -> None:
handle = _get_handle()
_log(f"A-Z laden: {plugin_name}")
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("A-Z", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
getter = getattr(plugin, "alpha_index", None)
if not callable(getter):
xbmcgui.Dialog().notification("A-Z", "A-Z nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
try:
letters = list(getter() or [])
except Exception as exc:
_log(f"A-Z konnte nicht geladen werden ({plugin_name}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("A-Z", "A-Z konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
for letter in letters:
letter = str(letter).strip()
if not letter:
continue
_add_directory_item(
handle,
letter,
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": "1"},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_alpha_titles_page(plugin_name: str, letter: str, page: int = 1) -> None:
handle = _get_handle()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("A-Z", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "titles_for_alpha_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("A-Z", "Paging nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "alpha_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(letter) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"{letter} ({page}/{total_pages})")
else:
xbmcplugin.setPluginCategory(handle, f"{letter} ({page})")
_set_content(handle, "movies" if (plugin_name or "").casefold() == "einschalten" else "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(letter, page) or [])
except Exception as exc:
_log(f"A-Z Seite konnte nicht geladen werden ({plugin_name}/{letter} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("A-Z", "Seite konnte nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
@@ -1611,11 +2119,113 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
show_next = False
if total_pages is not None:
show_next = page < total_pages
if show_next:
_add_directory_item(
handle,
"Nächste Seite",
"alpha_titles_page",
{"plugin": plugin_name, "letter": letter, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
def _show_series_catalog(plugin_name: str, page: int = 1) -> None:
handle = _get_handle()
plugin_name = (plugin_name or "").strip()
plugin = _discover_plugins().get(plugin_name)
if plugin is None:
xbmcgui.Dialog().notification("Serien", "Plugin nicht gefunden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
page = max(1, int(page or 1))
paging_getter = getattr(plugin, "series_catalog_page", None)
if not callable(paging_getter):
xbmcgui.Dialog().notification("Serien", "Serien nicht verfügbar.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
total_pages = None
count_getter = getattr(plugin, "series_catalog_page_count", None)
if callable(count_getter):
try:
total_pages = int(count_getter(page) or 1)
except Exception:
total_pages = None
if total_pages is not None:
page = min(page, max(1, total_pages))
xbmcplugin.setPluginCategory(handle, f"Serien ({page}/{total_pages})")
else:
has_more_getter = getattr(plugin, "genre_has_more", None)
xbmcplugin.setPluginCategory(handle, f"Serien ({page})")
_set_content(handle, "tvshows")
if page > 1:
_add_directory_item(
handle,
"Vorherige Seite",
"series_catalog",
{"plugin": plugin_name, "page": str(page - 1)},
is_folder=True,
)
try:
titles = list(paging_getter(page) or [])
except Exception as exc:
_log(f"Serien konnten nicht geladen werden ({plugin_name} p{page}): {exc}", xbmc.LOGWARNING)
xbmcgui.Dialog().notification("Serien", "Serien konnten nicht geladen werden.", xbmcgui.NOTIFICATION_INFO, 3000)
xbmcplugin.endOfDirectory(handle)
return
titles = [str(t).strip() for t in titles if t and str(t).strip()]
titles.sort(key=lambda value: value.casefold())
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if titles:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(titles)
for title in titles:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
for title in titles:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
show_next = False
if total_pages is not None:
show_next = page < total_pages
else:
has_more_getter = getattr(plugin, "series_catalog_has_more", None)
if callable(has_more_getter):
try:
show_next = bool(has_more_getter(genre, page))
show_next = bool(has_more_getter(page))
except Exception:
show_next = False
@@ -1623,8 +2233,8 @@ def _show_genre_titles_page(plugin_name: str, genre: str, page: int = 1) -> None
_add_directory_item(
handle,
"Nächste Seite",
"genre_titles_page",
{"plugin": plugin_name, "genre": genre, "page": str(page + 1)},
"series_catalog",
{"plugin": plugin_name, "page": str(page + 1)},
is_folder=True,
)
xbmcplugin.endOfDirectory(handle)
@@ -1802,40 +2412,45 @@ def _show_popular(plugin_name: str | None = None, page: int = 1) -> None:
show_tmdb = _get_setting_bool("tmdb_genre_metadata", default=False)
if page_items:
if show_tmdb:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(page_items)
for title in page_items:
info_labels, art, cast = tmdb_prefetched.get(title, _tmdb_labels_and_art(title))
info_labels = dict(info_labels or {})
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
else:
plugin_meta = _collect_plugin_metadata(plugin, page_items)
show_tmdb = show_tmdb and _tmdb_enabled()
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
prefer_source = bool(getattr(plugin, "prefer_source_metadata", False))
tmdb_prefetched: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
tmdb_titles = list(page_items)
if show_tmdb and prefer_source:
tmdb_titles = []
for title in page_items:
playstate = _title_playstate(plugin_name, title)
_add_directory_item(
handle,
_label_with_playstate(title, playstate),
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=_apply_playstate_to_info({"title": title}, playstate),
)
meta = plugin_meta.get(title)
meta_labels = meta[0] if meta else {}
meta_art = meta[1] if meta else {}
if _needs_tmdb(meta_labels, meta_art, want_plot=show_plot, want_art=show_art):
tmdb_titles.append(title)
if show_tmdb and tmdb_titles:
with _busy_dialog():
tmdb_prefetched = _tmdb_labels_and_art_bulk(tmdb_titles)
for title in page_items:
tmdb_info, tmdb_art, tmdb_cast = tmdb_prefetched.get(title, ({}, {}, []))
meta = plugin_meta.get(title)
info_labels, art, cast = _merge_metadata(title, tmdb_info, tmdb_art, tmdb_cast, meta)
info_labels.setdefault("mediatype", "tvshow")
if (info_labels.get("mediatype") or "").strip().casefold() == "tvshow":
info_labels.setdefault("tvshowtitle", title)
playstate = _title_playstate(plugin_name, title)
info_labels = _apply_playstate_to_info(dict(info_labels), playstate)
display_label = _label_with_duration(title, info_labels)
display_label = _label_with_playstate(display_label, playstate)
_add_directory_item(
handle,
display_label,
"seasons",
{"plugin": plugin_name, "title": title, **_series_url_params(plugin, title)},
is_folder=True,
info_labels=info_labels,
art=art,
cast=cast,
)
if total_pages > 1 and page < total_pages:
_add_directory_item(
@@ -2245,10 +2860,34 @@ def _open_settings() -> None:
"""Oeffnet das Kodi-Addon-Settings-Dialog."""
if xbmcaddon is None: # pragma: no cover - outside Kodi
raise RuntimeError("xbmcaddon ist nicht verfuegbar (KodiStub).")
_sync_update_version_settings()
addon = xbmcaddon.Addon()
addon.openSettings()
def _run_update_check() -> None:
"""Stoesst Kodi-Repo- und Addon-Updates an und informiert den Benutzer."""
if xbmc is None: # pragma: no cover - outside Kodi
return
try:
info_url = _normalize_update_info_url(_get_setting_string("update_repo_url"))
_set_setting_string("update_repo_url", info_url)
_sync_update_version_settings()
_update_repository_source(info_url)
builtin = getattr(xbmc, "executebuiltin", None)
if callable(builtin):
builtin("UpdateAddonRepos")
builtin("UpdateLocalAddons")
builtin("ActivateWindow(addonbrowser,addons://updates/)")
xbmcgui.Dialog().notification("ViewIT Update", "Update-Pruefung gestartet.", xbmcgui.NOTIFICATION_INFO, 4000)
except Exception as exc:
_log(f"Update-Pruefung fehlgeschlagen: {exc}", xbmc.LOGWARNING)
try:
xbmcgui.Dialog().notification("ViewIT Update", "Update-Pruefung fehlgeschlagen.", xbmcgui.NOTIFICATION_ERROR, 4000)
except Exception:
pass
def _extract_first_int(value: str) -> int | None:
match = re.search(r"(\d+)", value or "")
if not match:
@@ -2564,6 +3203,8 @@ def run() -> None:
_show_genre_sources()
elif action == "genres":
_show_genres(params.get("plugin", ""))
elif action == "categories":
_show_categories(params.get("plugin", ""))
elif action == "new_titles":
_show_new_titles(
params.get("plugin", ""),
@@ -2585,6 +3226,25 @@ def run() -> None:
params.get("genre", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "category_titles_page":
_show_category_titles_page(
params.get("plugin", ""),
params.get("category", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "alpha_index":
_show_alpha_index(params.get("plugin", ""))
elif action == "alpha_titles_page":
_show_alpha_titles_page(
params.get("plugin", ""),
params.get("letter", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "series_catalog":
_show_series_catalog(
params.get("plugin", ""),
_parse_positive_int(params.get("page", "1"), default=1),
)
elif action == "genre_series_group":
_show_genre_series_group(
params.get("plugin", ""),
@@ -2599,6 +3259,8 @@ def run() -> None:
)
elif action == "settings":
_open_settings()
elif action == "check_updates":
_run_update_check()
elif action == "seasons":
_show_seasons(params.get("plugin", ""), params.get("title", ""), params.get("series_url", ""))
elif action == "episodes":

View File

@@ -11,6 +11,7 @@ class BasisPlugin(ABC):
"""Abstrakte Basisklasse fuer alle Integrationen."""
name: str
version: str = "0.0.0"
@abstractmethod
async def search_titles(self, query: str) -> List[str]:

View File

@@ -691,6 +691,7 @@ def search_animes(query: str) -> List[SeriesResult]:
class AniworldPlugin(BasisPlugin):
name = "Aniworld"
version = "1.0.0"
def __init__(self) -> None:
self._anime_results: Dict[str, SeriesResult] = {}

View File

@@ -0,0 +1,476 @@
"""Doku-Streams (doku-streams.com) Integration."""
from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "doku_streams_base_url"
DEFAULT_BASE_URL = "https://doku-streams.com"
MOST_VIEWED_PATH = "/meistgesehene/"
DEFAULT_TIMEOUT = 20
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_dokustreams"
SETTING_DUMP_HTML = "dump_html_dokustreams"
SETTING_SHOW_URL_INFO = "show_url_info_dokustreams"
SETTING_LOG_ERRORS = "log_errors_dokustreams"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass(frozen=True)
class SearchHit:
title: str
url: str
plot: str = ""
poster: str = ""
def _extract_last_page(soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("nav.navigation a[href], nav.pagination a[href], a.page-numbers[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"/page/(\\d+)/", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
for value in re.findall(r"(\\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def _extract_summary_and_poster(article: BeautifulSoupT) -> tuple[str, str]:
summary = ""
if article:
summary_box = article.select_one("div.entry-summary")
if summary_box is not None:
for p in summary_box.find_all("p"):
text = (p.get_text(" ", strip=True) or "").strip()
if text:
summary = text
break
poster = ""
if article:
img = article.select_one("div.entry-thumb img")
if img is not None:
poster = (img.get("data-src") or "").strip() or (img.get("src") or "").strip()
if "lazy_placeholder" in poster and img.get("data-src"):
poster = (img.get("data-src") or "").strip()
poster = _absolute_url(poster)
return summary, poster
def _parse_listing_hits(soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
for article in soup.select("article[id^='post-']"):
anchor = article.select_one("h2.entry-title a[href]")
if anchor is None:
continue
href = (anchor.get("href") or "").strip()
title = (anchor.get_text(" ", strip=True) or "").strip()
if not href or not title:
continue
if query and not _matches_query(query, title=title):
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
summary, poster = _extract_summary_and_poster(article)
hits.append(SearchHit(title=title, url=url, plot=summary, poster=poster))
return hits
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _absolute_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if url.startswith("http://") or url.startswith("https://"):
return url
if url.startswith("//"):
return f"https:{url}"
if url.startswith("/"):
return f"{_get_base_url()}{url}"
return f"{_get_base_url()}/{url.lstrip('/')}"
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="dokustreams_urls.log",
url=url,
kind=kind,
)
def _log_visit(url: str) -> None:
_log_url_event(url, kind="VISIT")
notify_url(
ADDON_ID,
heading="Doku-Streams",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="dokustreams_response",
)
def _log_error_message(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="dokustreams_errors.log",
message=message,
)
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
_log_visit(url)
sess = session or get_requests_session("dokustreams", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error_message(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
return BeautifulSoup(response.text, "html.parser")
class DokuStreamsPlugin(BasisPlugin):
name = "Doku-Streams"
version = "1.0.0"
prefer_source_metadata = True
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._category_to_url: Dict[str, str] = {}
self._category_page_count_cache: Dict[str, int] = {}
self._popular_cache: Optional[List[SearchHit]] = None
self._title_meta: Dict[str, tuple[str, str]] = {}
self._requests_available = REQUESTS_AVAILABLE
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"DokuStreamsPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
self._title_to_url = {hit.title: hit.url for hit in hits if hit.title and hit.url}
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def _search_hits(self, query: str) -> List[SearchHit]:
query = (query or "").strip()
if not query or not self._requests_available:
return []
search_url = _absolute_url(f"/?s={quote(query)}")
session = get_requests_session("dokustreams", headers=HEADERS)
try:
soup = _get_soup(search_url, session=session)
except Exception:
return []
return _parse_listing_hits(soup, query=query)
def capabilities(self) -> set[str]:
return {"genres", "popular_series"}
def _categories_url(self) -> str:
return _absolute_url("/kategorien/")
def _parse_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
def clean_name(value: str) -> str:
value = (value or "").strip()
return re.sub(r"\\s*\\(\\d+\\)\\s*$", "", value).strip()
def walk(ul, parents: List[str]) -> None:
for li in ul.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = clean_name(anchor.get_text(" ", strip=True) or "")
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
child_ul = li.find("ul", class_="nested-category-list")
if child_ul is not None:
walk(child_ul, parents + [name])
else:
if parents:
label = " \u2192 ".join(parents + [name])
categories[label] = _absolute_url(href)
walk(root, [])
return categories
def _parse_top_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
for li in root.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
categories[name] = _absolute_url(href)
return categories
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._category_to_url:
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_categories(soup)
if parsed:
self._category_to_url = dict(parsed)
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
def categories(self) -> List[str]:
if not self._requests_available:
return []
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_top_categories(soup)
if parsed:
for key, value in parsed.items():
self._category_to_url.setdefault(key, value)
return list(parsed.keys())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._category_page_count_cache:
return max(1, int(self._category_page_count_cache.get(genre, 1)))
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return 1
pages = _extract_last_page(soup)
self._category_page_count_cache[genre] = max(1, pages)
return self._category_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else f"{base_url.rstrip('/')}/page/{page}/"
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
return titles
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _most_viewed_url(self) -> str:
return _absolute_url(MOST_VIEWED_PATH)
def popular_series(self) -> List[str]:
if not self._requests_available:
return []
if self._popular_cache is not None:
titles = [hit.title for hit in self._popular_cache if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
try:
soup = _get_soup(self._most_viewed_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
self._popular_cache = list(hits)
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
plot, poster = self._title_meta.get(title, ("", ""))
info: dict[str, str] = {"title": title}
if plot:
info["plot"] = plot
art: dict[str, str] = {}
if poster:
art = {"thumb": poster, "poster": poster}
return info, art, None
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return ["Stream"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return [title]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
if not title:
return None
url = self._title_to_url.get(title)
if not url:
return None
if not self._requests_available:
return None
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return None
iframe = soup.select_one("div.fluid-width-video-wrapper iframe[src]")
if iframe is None:
iframe = soup.select_one("iframe[src*='youtube'], iframe[src*='vimeo'], iframe[src]")
if iframe is None:
return None
src = (iframe.get("src") or "").strip()
if not src:
return None
return _absolute_url(src)
# Alias für die automatische Plugin-Erkennung.
Plugin = DokuStreamsPlugin

View File

@@ -507,6 +507,7 @@ class EinschaltenPlugin(BasisPlugin):
"""Metadata-Plugin für eine autorisierte Quelle."""
name = "Einschalten"
version = "1.0.0"
def __init__(self) -> None:
self.is_available = REQUESTS_AVAILABLE

View File

@@ -10,6 +10,7 @@ from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote, urlencode
from urllib.parse import urljoin
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency
@@ -42,6 +43,7 @@ DEFAULT_BASE_URL = "https://filmpalast.to"
DEFAULT_TIMEOUT = 20
DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"]
SERIES_HINT_PREFIX = "series://filmpalast/"
SERIES_VIEW_PATH = "/serien/view"
SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE)
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
@@ -218,11 +220,17 @@ def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> Beautif
class FilmpalastPlugin(BasisPlugin):
name = "Filmpalast"
version = "1.0.0"
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {}
self._hoster_cache: Dict[str, Dict[str, str]] = {}
self._genre_to_url: Dict[str, str] = {}
self._genre_page_count_cache: Dict[str, int] = {}
self._alpha_to_url: Dict[str, str] = {}
self._alpha_page_count_cache: Dict[str, int] = {}
self._series_page_count_cache: Dict[int, int] = {}
self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
@@ -391,8 +399,41 @@ class FilmpalastPlugin(BasisPlugin):
return hits
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
def _parse_listing_hits(self, soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
if not anchors:
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if not _is_probably_content_url(url):
continue
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
if not title:
continue
if title.casefold() in {"details/play", "play", "details"}:
continue
if query and not _matches_query(query, title=title):
continue
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
hits.append(SearchHit(title=title, url=url))
return hits
def _apply_hits_to_title_index(self, hits: List[SearchHit]) -> List[str]:
self._title_to_url = {}
self._series_entries = {}
self._hoster_cache.clear()
@@ -425,6 +466,208 @@ class FilmpalastPlugin(BasisPlugin):
titles.sort(key=lambda value: value.casefold())
return titles
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
return self._apply_hits_to_title_index(hits)
def _parse_genres(self, soup: BeautifulSoupT) -> Dict[str, str]:
genres: Dict[str, str] = {}
if not soup:
return genres
for anchor in soup.select("section#genre a[href], #genre a[href], aside #genre a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/genre/" not in href:
continue
genres[name] = _absolute_url(href)
return genres
def _extract_last_page(self, soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("#paging a[href], .paging a[href], a.pageing[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"(\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def capabilities(self) -> set[str]:
return {"genres", "alpha", "series_catalog"}
def _parse_alpha_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
alpha: Dict[str, str] = {}
if not soup:
return alpha
for anchor in soup.select("section#movietitle a[href], #movietitle a[href], aside #movietitle a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/alpha/" not in href:
continue
if name in alpha:
continue
alpha[name] = _absolute_url(href)
return alpha
def alpha_index(self) -> List[str]:
if not self._requests_available:
return []
if self._alpha_to_url:
return list(self._alpha_to_url.keys())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_alpha_links(soup)
if parsed:
self._alpha_to_url = dict(parsed)
return list(self._alpha_to_url.keys())
def alpha_page_count(self, letter: str) -> int:
letter = (letter or "").strip()
if not letter:
return 1
if letter in self._alpha_page_count_cache:
return max(1, int(self._alpha_page_count_cache.get(letter, 1)))
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._alpha_page_count_cache[letter] = max(1, pages)
return self._alpha_page_count_cache[letter]
def titles_for_alpha_page(self, letter: str, page: int) -> List[str]:
letter = (letter or "").strip()
if not letter or not self._requests_available:
return []
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_alpha(self, letter: str) -> List[str]:
titles = self.titles_for_alpha_page(letter, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _series_view_url(self) -> str:
return _absolute_url(SERIES_VIEW_PATH)
def series_catalog_page_count(self, page: int = 1) -> int:
if not self._requests_available:
return 1
cache_key = int(page or 1)
if cache_key in self._series_page_count_cache:
return max(1, int(self._series_page_count_cache.get(cache_key, 1)))
base_url = self._series_view_url()
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._series_page_count_cache[cache_key] = max(1, pages)
return self._series_page_count_cache[cache_key]
def series_catalog_page(self, page: int) -> List[str]:
if not self._requests_available:
return []
base_url = self._series_view_url()
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def series_catalog_has_more(self, page: int) -> bool:
total = self.series_catalog_page_count(page)
return page < total
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._genre_to_url:
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_genres(soup)
if parsed:
self._genre_to_url = dict(parsed)
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._genre_page_count_cache:
return max(1, int(self._genre_page_count_cache.get(genre, 1)))
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._genre_page_count_cache[genre] = max(1, pages)
return self._genre_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _ensure_title_url(self, title: str) -> str:
title = (title or "").strip()
if not title:
@@ -643,26 +886,39 @@ class FilmpalastPlugin(BasisPlugin):
def resolve_stream_link(self, link: str) -> Optional[str]:
if not link:
return None
resolved = link
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
# 1) Immer zuerst den ursprünglichen Hoster-Link an ResolveURL geben.
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(link)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
redirected = link
if self._requests_available:
try:
session = get_requests_session("filmpalast", headers=HEADERS)
response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
response.raise_for_status()
resolved = (response.url or link).strip() or link
redirected = (response.url or link).strip() or link
except Exception:
resolved = link
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(resolved)
redirected = link
# 2) Danach optional die Redirect-URL nochmals auflösen.
if callable(resolve_with_resolveurl) and redirected and redirected != link:
resolved_by_resolveurl = resolve_with_resolveurl(redirected)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
if resolved:
_log_url_event(resolved, kind="FINAL")
return resolved
# 3) Fallback bleibt wie bisher: direkte URL zurückgeben.
if redirected:
_log_url_event(redirected, kind="FINAL")
return redirected
return None

View File

@@ -784,6 +784,7 @@ class SerienstreamPlugin(BasisPlugin):
"""Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt."""
name = "Serienstream"
version = "1.0.0"
POPULAR_GENRE_LABEL = "⭐ Beliebte Serien"
def __init__(self) -> None:

View File

@@ -123,6 +123,7 @@ class TopstreamfilmPlugin(BasisPlugin):
"""Integration fuer eine HTML-basierte Suchseite."""
name = "Topstreamfilm"
version = "1.0.0"
def __init__(self) -> None:
self._session: RequestsSession | None = None

View File

@@ -45,6 +45,9 @@
<category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" />
</category>
<category label="Doku-Streams">
<setting id="doku_streams_base_url" type="text" label="Domain (BASE_URL)" default="https://doku-streams.com" />
</category>
<category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />
<setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" />
@@ -61,4 +64,16 @@
<setting id="tmdb_log_requests" type="bool" label="TMDB API Requests loggen" default="false" />
<setting id="tmdb_log_responses" type="bool" label="TMDB API Antworten loggen" default="false" />
</category>
<category label="Update">
<setting id="update_repo_url" type="text" label="Update-URL (addons.xml)" default="http://127.0.0.1:8080/repo/addons.xml" />
<setting id="run_update_check" type="action" label="Jetzt auf Updates pruefen" action="RunPlugin(plugin://plugin.video.viewit/?action=check_updates)" option="close" />
<setting id="update_info" type="text" label="Kodi-Repository-Updates werden ueber den Kodi-Update-Mechanismus verarbeitet." default="" enable="false" />
<setting id="update_version_addon" type="text" label="ViewIT Addon Version" default="-" enable="false" />
<setting id="update_version_serienstream" type="text" label="Serienstream Plugin Version" default="-" enable="false" />
<setting id="update_version_aniworld" type="text" label="Aniworld Plugin Version" default="-" enable="false" />
<setting id="update_version_einschalten" type="text" label="Einschalten Plugin Version" default="-" enable="false" />
<setting id="update_version_topstreamfilm" type="text" label="Topstreamfilm Plugin Version" default="-" enable="false" />
<setting id="update_version_filmpalast" type="text" label="Filmpalast Plugin Version" default="-" enable="false" />
<setting id="update_version_doku_streams" type="text" label="Doku-Streams Plugin Version" default="-" enable="false" />
</category>
</settings>

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="repository.viewit" name="ViewIT Repository" version="1.0.0" provider-name="ViewIT">
<extension point="xbmc.addon.repository" name="ViewIT Repository">
<dir>
<info compressed="false">http://127.0.0.1:8080/repo/addons.xml</info>
<checksum>http://127.0.0.1:8080/repo/addons.xml.md5</checksum>
<datadir zip="true">http://127.0.0.1:8080/repo/</datadir>
</dir>
</extension>
<extension point="xbmc.addon.metadata">
<summary lang="de_DE">Lokales Repository fuer ViewIT Updates</summary>
<summary lang="en_GB">Local repository for ViewIT updates</summary>
<description lang="de_DE">Stellt das ViewIT Addon ueber ein Kodi Repository bereit.</description>
<description lang="en_GB">Provides the ViewIT addon via a Kodi repository.</description>
<platform>all</platform>
</extension>
</addon>

110
scripts/build_local_kodi_repo.sh Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
REPO_DIR="${DIST_DIR}/repo"
PLUGIN_ADDON_XML="${ROOT_DIR}/addon/addon.xml"
REPO_SRC_DIR="${ROOT_DIR}/repository.viewit"
REPO_ADDON_XML="${REPO_SRC_DIR}/addon.xml"
REPO_BASE_URL="${REPO_BASE_URL:-http://127.0.0.1:8080/repo}"
if [[ ! -f "${PLUGIN_ADDON_XML}" ]]; then
echo "Missing: ${PLUGIN_ADDON_XML}" >&2
exit 1
fi
if [[ ! -f "${REPO_ADDON_XML}" ]]; then
echo "Missing: ${REPO_ADDON_XML}" >&2
exit 1
fi
mkdir -p "${REPO_DIR}"
PLUGIN_ZIP="$("${ROOT_DIR}/scripts/build_kodi_zip.sh")"
cp -f "${PLUGIN_ZIP}" "${REPO_DIR}/"
read -r REPO_ADDON_ID REPO_ADDON_VERSION < <(python3 - "${REPO_ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "repository.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
TMP_DIR="$(mktemp -d)"
trap 'rm -rf "${TMP_DIR}"' EXIT
TMP_REPO_ADDON_DIR="${TMP_DIR}/${REPO_ADDON_ID}"
mkdir -p "${TMP_REPO_ADDON_DIR}"
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete "${REPO_SRC_DIR}/" "${TMP_REPO_ADDON_DIR}/"
else
cp -a "${REPO_SRC_DIR}/." "${TMP_REPO_ADDON_DIR}/"
fi
python3 - "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_BASE_URL}" <<'PY'
import sys
import xml.etree.ElementTree as ET
addon_xml = sys.argv[1]
base_url = sys.argv[2].rstrip("/")
tree = ET.parse(addon_xml)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
raise SystemExit("Invalid repository addon.xml: missing <dir>")
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
raise SystemExit("Invalid repository addon.xml: missing info/checksum/datadir")
info.text = f"{base_url}/addons.xml"
checksum.text = f"{base_url}/addons.xml.md5"
datadir.text = f"{base_url}/"
tree.write(addon_xml, encoding="utf-8", xml_declaration=True)
PY
REPO_ZIP_NAME="${REPO_ADDON_ID}-${REPO_ADDON_VERSION}.zip"
REPO_ZIP_PATH="${REPO_DIR}/${REPO_ZIP_NAME}"
rm -f "${REPO_ZIP_PATH}"
(cd "${TMP_DIR}" && zip -r "${REPO_ZIP_PATH}" "${REPO_ADDON_ID}" >/dev/null)
python3 - "${PLUGIN_ADDON_XML}" "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_DIR}/addons.xml" <<'PY'
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
plugin_xml = Path(sys.argv[1])
repo_xml = Path(sys.argv[2])
target = Path(sys.argv[3])
addons = ET.Element("addons")
for source in (plugin_xml, repo_xml):
root = ET.parse(source).getroot()
addons.append(root)
target.write_text('<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(addons, encoding="unicode"), encoding="utf-8")
PY
python3 - "${REPO_DIR}/addons.xml" "${REPO_DIR}/addons.xml.md5" <<'PY'
import hashlib
import sys
from pathlib import Path
addons_xml = Path(sys.argv[1])
md5_file = Path(sys.argv[2])
md5 = hashlib.md5(addons_xml.read_bytes()).hexdigest()
md5_file.write_text(md5, encoding="ascii")
PY
echo "Repo built:"
echo " ${REPO_DIR}/addons.xml"
echo " ${REPO_DIR}/addons.xml.md5"
echo " ${REPO_ZIP_PATH}"
echo " ${REPO_DIR}/$(basename "${PLUGIN_ZIP}")"

193
scripts/publish_gitea_release.sh Executable file
View File

@@ -0,0 +1,193 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
ADDON_XML="${ROOT_DIR}/addon/addon.xml"
DEFAULT_NOTES="Automatischer Release-Upload aus ViewIT Build."
TAG=""
ASSET_PATH=""
TITLE=""
NOTES="${DEFAULT_NOTES}"
DRY_RUN="0"
while [[ $# -gt 0 ]]; do
case "$1" in
--tag)
TAG="${2:-}"
shift 2
;;
--asset)
ASSET_PATH="${2:-}"
shift 2
;;
--title)
TITLE="${2:-}"
shift 2
;;
--notes)
NOTES="${2:-}"
shift 2
;;
--dry-run)
DRY_RUN="1"
shift
;;
*)
echo "Unbekanntes Argument: $1" >&2
exit 1
;;
esac
done
if [[ ! -f "${ADDON_XML}" ]]; then
echo "Missing: ${ADDON_XML}" >&2
exit 1
fi
read -r ADDON_ID ADDON_VERSION < <(python3 - "${ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "plugin.video.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
if [[ -z "${TAG}" ]]; then
TAG="v${ADDON_VERSION}"
fi
if [[ -z "${ASSET_PATH}" ]]; then
ASSET_PATH="${ROOT_DIR}/dist/${ADDON_ID}-${ADDON_VERSION}.zip"
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset nicht gefunden, baue ZIP: ${ASSET_PATH}"
"${ROOT_DIR}/scripts/build_kodi_zip.sh" >/dev/null
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset fehlt nach Build: ${ASSET_PATH}" >&2
exit 1
fi
if [[ -z "${TITLE}" ]]; then
TITLE="ViewIT ${TAG}"
fi
REMOTE_URL="$(git -C "${ROOT_DIR}" remote get-url origin)"
read -r BASE_URL OWNER REPO < <(python3 - "${REMOTE_URL}" <<'PY'
import re
import sys
u = sys.argv[1].strip()
m = re.match(r"^https?://([^/]+)/([^/]+)/([^/.]+)(?:\.git)?/?$", u)
if not m:
raise SystemExit("Origin-URL muss https://host/owner/repo(.git) sein.")
host, owner, repo = m.group(1), m.group(2), m.group(3)
print(f"https://{host}", owner, repo)
PY
)
API_BASE="${BASE_URL}/api/v1/repos/${OWNER}/${REPO}"
ASSET_NAME="$(basename "${ASSET_PATH}")"
if [[ "${DRY_RUN}" == "1" ]]; then
echo "[DRY-RUN] API: ${API_BASE}"
echo "[DRY-RUN] Tag: ${TAG}"
echo "[DRY-RUN] Asset: ${ASSET_PATH}"
exit 0
fi
if [[ -z "${GITEA_TOKEN:-}" ]]; then
echo "Bitte GITEA_TOKEN setzen." >&2
exit 1
fi
tmp_json="$(mktemp)"
tmp_http="$(mktemp)"
trap 'rm -f "${tmp_json}" "${tmp_http}"' EXIT
urlenc() {
python3 - "$1" <<'PY'
import sys
from urllib.parse import quote
print(quote(sys.argv[1], safe=""))
PY
}
tag_enc="$(urlenc "${TAG}")"
auth_header="Authorization: token ${GITEA_TOKEN}"
http_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/tags/${tag_enc}")"
if [[ "${http_code}" == "200" ]]; then
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
elif [[ "${http_code}" == "404" ]]; then
payload="$(python3 - "${TAG}" "${TITLE}" "${NOTES}" <<'PY'
import json,sys
print(json.dumps({
"tag_name": sys.argv[1],
"name": sys.argv[2],
"body": sys.argv[3],
"draft": False,
"prerelease": False
}))
PY
)"
http_code_create="$(curl -sS -X POST -H "${auth_header}" -H "Content-Type: application/json" -d "${payload}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases")"
if [[ "${http_code_create}" != "201" ]]; then
echo "Release konnte nicht erstellt werden (HTTP ${http_code_create})." >&2
cat "${tmp_json}" >&2
exit 1
fi
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
else
echo "Release-Abfrage fehlgeschlagen (HTTP ${http_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
assets_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets")"
if [[ "${assets_code}" == "200" ]]; then
EXISTING_ASSET_ID="$(python3 - "${tmp_json}" "${ASSET_NAME}" <<'PY'
import json,sys
assets=json.load(open(sys.argv[1], encoding="utf-8"))
name=sys.argv[2]
for a in assets:
if a.get("name")==name:
print(a.get("id"))
break
PY
)"
if [[ -n "${EXISTING_ASSET_ID}" ]]; then
del_code="$(curl -sS -X DELETE -H "${auth_header}" -o "${tmp_http}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets/${EXISTING_ASSET_ID}")"
if [[ "${del_code}" != "204" ]]; then
echo "Altes Asset konnte nicht geloescht werden (HTTP ${del_code})." >&2
cat "${tmp_http}" >&2
exit 1
fi
fi
fi
asset_name_enc="$(urlenc "${ASSET_NAME}")"
upload_code="$(curl -sS -X POST -H "${auth_header}" -F "attachment=@${ASSET_PATH}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets?name=${asset_name_enc}")"
if [[ "${upload_code}" != "201" ]]; then
echo "Asset-Upload fehlgeschlagen (HTTP ${upload_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
echo "Release-Asset hochgeladen:"
echo " Repo: ${OWNER}/${REPO}"
echo " Tag: ${TAG}"
echo " Asset: ${ASSET_NAME}"
echo " URL: ${BASE_URL}/${OWNER}/${REPO}/releases/tag/${TAG}"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
HOST="${HOST:-127.0.0.1}"
PORT="${PORT:-8080}"
if [[ ! -f "${DIST_DIR}/repo/addons.xml" ]]; then
echo "Missing ${DIST_DIR}/repo/addons.xml" >&2
echo "Run ./scripts/build_local_kodi_repo.sh first." >&2
exit 1
fi
echo "Serving local Kodi repo from ${DIST_DIR}"
echo "Repository URL: http://${HOST}:${PORT}/repo/addons.xml"
(cd "${DIST_DIR}" && python3 -m http.server "${PORT}" --bind "${HOST}")