Compare commits

...

18 Commits

Author SHA1 Message Date
a37c45e2ef Nightly: bump version and refresh snapshots 2026-02-07 17:36:33 +01:00
7f5924b850 Nightly: snapshot harness and cache ignore 2026-02-07 17:33:45 +01:00
b370afe167 Nightly: reproducible zips and plugin manifest 2026-02-07 17:28:49 +01:00
09d2fc850d Nightly: deterministic plugin loading and docs refresh 2026-02-07 17:23:29 +01:00
6ce1bf71c1 Add Doku-Streams plugin and prefer source metadata 2026-02-07 16:11:48 +01:00
c7d848385f Add Filmpalast series catalog browsing 2026-02-06 12:35:22 +01:00
280a82f08b Add Filmpalast A-Z browsing and document Gitea release upload 2026-02-06 12:29:12 +01:00
9aedbee083 Add configurable update source and update-only version display 2026-02-05 13:15:58 +01:00
4c3f90233d Stop tracking local coverage and pycache artifacts 2026-02-04 15:34:48 +01:00
9e15212a66 Add local Kodi repo tooling and repository addon 2026-02-04 15:13:00 +01:00
951e99cb4c Add Filmpalast genre browsing and paged genre titles 2026-02-02 23:13:23 +01:00
4f7b0eba0c Fix Filmpalast resolver handoff for movie playback 2026-02-02 22:28:34 +01:00
ae3cff7528 Add Filmpalast plugin search flow and bump to 0.1.50 2026-02-02 22:16:43 +01:00
db61bb67ba Refactor code structure for improved readability and maintainability 2026-02-02 15:40:52 +01:00
4521d9fb1d Add initial pytest configuration in settings.json 2026-02-02 15:16:00 +01:00
ca362f80fe Integrate pytest coverage configuration 2026-02-01 23:32:30 +01:00
372d443cb2 Show search progress per plugin during global search 2026-02-01 23:26:12 +01:00
1e3c6ffdf6 Refine title search to whole-word matching and bump 0.1.49 2026-02-01 23:14:10 +01:00
35 changed files with 3636 additions and 149 deletions

9
.gitignore vendored
View File

@@ -9,5 +9,14 @@
# Local tests (not committed)
/tests/
/TESTING/
/.pytest_cache/
/pytest.ini
# Python artifacts
__pycache__/
*.pyc
.coverage
# Plugin runtime caches
/addon/plugins/*_cache.json

7
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@@ -14,11 +14,29 @@ ViewIT ist ein KodiAddon zum Durchsuchen und Abspielen von Inhalten der unter
- AddonOrdner bauen: `./scripts/build_install_addon.sh``dist/<addon_id>/`
- KodiZIP bauen: `./scripts/build_kodi_zip.sh``dist/<addon_id>-<version>.zip`
- AddonVersion in `addon/addon.xml`
- Reproduzierbare ZIPs: optional `SOURCE_DATE_EPOCH` setzen
## Lokales Kodi-Repository
- Repository bauen (inkl. ZIPs + `addons.xml` + `addons.xml.md5`): `./scripts/build_local_kodi_repo.sh`
- Lokal bereitstellen: `./scripts/serve_local_kodi_repo.sh`
- Standard-URL: `http://127.0.0.1:8080/repo/addons.xml`
- Optional eigene URL beim Build setzen: `REPO_BASE_URL=http://<host>:<port>/repo ./scripts/build_local_kodi_repo.sh`
## Gitea Release-Asset Upload
- ZIP bauen: `./scripts/build_kodi_zip.sh`
- Token setzen: `export GITEA_TOKEN=<token>`
- Asset an Tag hochladen (erstellt Release bei Bedarf): `./scripts/publish_gitea_release.sh`
- Optional: `--tag v0.1.50 --asset dist/plugin.video.viewit-0.1.50.zip`
## Entwicklung (kurz)
- Hauptlogik: `addon/default.py`
- Plugins: `addon/plugins/*_plugin.py`
- Einstellungen: `addon/resources/settings.xml`
## Tests mit Abdeckung
- Dev-Abhängigkeiten installieren: `./.venv/bin/pip install -r requirements-dev.txt`
- Tests + Coverage starten: `./.venv/bin/pytest`
- Optional (XML-Report): `./.venv/bin/pytest --cov-report=xml`
## Dokumentation
Siehe `docs/`.

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.48" provider-name="ViewIt">
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.51" provider-name="ViewIt">
<requires>
<import addon="xbmc.python" version="3.0.0" />
<import addon="script.module.requests" />

File diff suppressed because it is too large Load Diff

View File

@@ -4,13 +4,15 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Tuple
class BasisPlugin(ABC):
"""Abstrakte Basisklasse fuer alle Integrationen."""
name: str
version: str = "0.0.0"
prefer_source_metadata: bool = False
@abstractmethod
async def search_titles(self, query: str) -> List[str]:
@@ -28,6 +30,10 @@ class BasisPlugin(ABC):
"""Optional: Liefert den Stream-Link fuer eine konkrete Folge."""
return None
def metadata_for(self, title: str) -> Tuple[Dict[str, str], Dict[str, str], Optional[List[Any]]]:
"""Optional: Liefert Info-Labels, Art und Cast fuer einen Titel."""
return {}, {}, None
def resolve_stream_link(self, link: str) -> Optional[str]:
"""Optional: Folgt einem Stream-Link und liefert die finale URL."""
return None

View File

@@ -8,7 +8,11 @@ Dieses Plugin ist weitgehend kompatibel zur Serienstream-Integration:
from __future__ import annotations
from dataclasses import dataclass
from html import unescape
import hashlib
import json
import re
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency
@@ -25,8 +29,10 @@ else:
try: # pragma: no cover - optional Kodi helpers
import xbmcaddon # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow running outside Kodi
xbmcaddon = None
xbmcgui = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
@@ -60,6 +66,9 @@ HEADERS = {
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
SESSION_CACHE_TTL_SECONDS = 300
SESSION_CACHE_PREFIX = "viewit.aniworld"
SESSION_CACHE_MAX_TITLE_URLS = 800
@dataclass
@@ -128,6 +137,67 @@ def _absolute_url(href: str) -> str:
return f"{_get_base_url()}{href}" if href.startswith("/") else href
def _session_window() -> Any:
if xbmcgui is None:
return None
try:
return xbmcgui.Window(10000)
except Exception:
return None
def _session_cache_key(name: str) -> str:
base_hash = hashlib.sha1(_get_base_url().encode("utf-8")).hexdigest()[:12]
return f"{SESSION_CACHE_PREFIX}.{base_hash}.{name}"
def _session_cache_get(name: str) -> Any:
window = _session_window()
if window is None:
return None
raw = ""
try:
raw = window.getProperty(_session_cache_key(name)) or ""
except Exception:
return None
if not raw:
return None
try:
payload = json.loads(raw)
except Exception:
return None
if not isinstance(payload, dict):
return None
expires_at = payload.get("expires_at")
data = payload.get("data")
try:
if float(expires_at or 0) <= time.time():
return None
except Exception:
return None
return data
def _session_cache_set(name: str, data: Any, *, ttl_seconds: int = SESSION_CACHE_TTL_SECONDS) -> None:
window = _session_window()
if window is None:
return
payload = {
"expires_at": float(time.time() + max(1, int(ttl_seconds))),
"data": data,
}
try:
raw = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
except Exception:
return
if len(raw) > 240_000:
return
try:
window.setProperty(_session_cache_key(name), raw)
except Exception:
return
def _log_url(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
@@ -192,10 +262,8 @@ def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = _normalize_search_text(title)
if not haystack:
return False
return normalized_query in haystack
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _ensure_requests() -> None:
@@ -235,7 +303,7 @@ def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> Beautif
return BeautifulSoup(response.text, "html.parser")
def _get_soup_simple(url: str) -> BeautifulSoupT:
def _get_html_simple(url: str) -> str:
_ensure_requests()
_log_visit(url)
sess = get_requests_session("aniworld", headers=HEADERS)
@@ -247,10 +315,36 @@ def _get_soup_simple(url: str) -> BeautifulSoupT:
raise
if response.url and response.url != url:
_log_url(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
if _looks_like_cloudflare_challenge(response.text):
body = response.text
_log_response_html(url, body)
if _looks_like_cloudflare_challenge(body):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return BeautifulSoup(response.text, "html.parser")
return body
def _get_soup_simple(url: str) -> BeautifulSoupT:
body = _get_html_simple(url)
return BeautifulSoup(body, "html.parser")
def _extract_genre_names_from_html(body: str) -> List[str]:
names: List[str] = []
seen: set[str] = set()
pattern = re.compile(
r"<div[^>]*class=[\"'][^\"']*seriesGenreList[^\"']*[\"'][^>]*>.*?<h3[^>]*>(.*?)</h3>",
re.IGNORECASE | re.DOTALL,
)
for match in pattern.finditer(body or ""):
text = re.sub(r"<[^>]+>", " ", match.group(1) or "")
text = unescape(re.sub(r"\s+", " ", text)).strip()
if not text:
continue
key = text.casefold()
if key in seen:
continue
seen.add(key)
names.append(text)
return names
def _post_json(url: str, *, payload: Dict[str, str], session: Optional[RequestsSession] = None) -> Any:
@@ -597,9 +691,12 @@ def search_animes(query: str) -> List[SeriesResult]:
class AniworldPlugin(BasisPlugin):
name = "Aniworld"
version = "1.0.0"
def __init__(self) -> None:
self._anime_results: Dict[str, SeriesResult] = {}
self._title_url_cache: Dict[str, str] = self._load_title_url_cache()
self._genre_names_cache: Optional[List[str]] = None
self._season_cache: Dict[str, List[SeasonInfo]] = {}
self._season_links_cache: Dict[str, List[SeasonInfo]] = {}
self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {}
@@ -619,6 +716,132 @@ class AniworldPlugin(BasisPlugin):
if REQUESTS_IMPORT_ERROR:
print(f"AniworldPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
def _load_title_url_cache(self) -> Dict[str, str]:
raw = _session_cache_get("title_urls")
if not isinstance(raw, dict):
return {}
result: Dict[str, str] = {}
for key, value in raw.items():
key_text = str(key or "").strip().casefold()
url_text = str(value or "").strip()
if not key_text or not url_text:
continue
result[key_text] = url_text
return result
def _save_title_url_cache(self) -> None:
if not self._title_url_cache:
return
while len(self._title_url_cache) > SESSION_CACHE_MAX_TITLE_URLS:
self._title_url_cache.pop(next(iter(self._title_url_cache)))
_session_cache_set("title_urls", self._title_url_cache)
def _remember_anime_result(
self,
title: str,
url: str,
description: str = "",
*,
persist: bool = True,
) -> bool:
title = (title or "").strip()
url = (url or "").strip()
if not title:
return False
changed = False
current = self._anime_results.get(title)
if current is None or (url and current.url != url) or (description and current.description != description):
self._anime_results[title] = SeriesResult(title=title, description=description, url=url)
changed = True
if url:
key = title.casefold()
if self._title_url_cache.get(key) != url:
self._title_url_cache[key] = url
changed = True
if changed and persist:
self._save_title_url_cache()
return changed
@staticmethod
def _season_links_cache_name(series_url: str) -> str:
digest = hashlib.sha1((series_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_links.{digest}"
@staticmethod
def _season_episodes_cache_name(season_url: str) -> str:
digest = hashlib.sha1((season_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_episodes.{digest}"
def _load_session_season_links(self, series_url: str) -> Optional[List[SeasonInfo]]:
raw = _session_cache_get(self._season_links_cache_name(series_url))
if not isinstance(raw, list):
return None
seasons: List[SeasonInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
url = str(item.get("url") or "").strip()
if number <= 0 or not url:
continue
seasons.append(SeasonInfo(number=number, url=url, episodes=[]))
if not seasons:
return None
seasons.sort(key=lambda s: s.number)
return seasons
def _save_session_season_links(self, series_url: str, seasons: List[SeasonInfo]) -> None:
payload = [{"number": int(season.number), "url": season.url} for season in seasons if season.url]
if payload:
_session_cache_set(self._season_links_cache_name(series_url), payload)
def _load_session_season_episodes(self, season_url: str) -> Optional[List[EpisodeInfo]]:
raw = _session_cache_get(self._season_episodes_cache_name(season_url))
if not isinstance(raw, list):
return None
episodes: List[EpisodeInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
title = str(item.get("title") or "").strip()
original_title = str(item.get("original_title") or "").strip()
url = str(item.get("url") or "").strip()
if number <= 0:
continue
episodes.append(
EpisodeInfo(
number=number,
title=title or f"Episode {number}",
original_title=original_title,
url=url,
)
)
if not episodes:
return None
episodes.sort(key=lambda item: item.number)
return episodes
def _save_session_season_episodes(self, season_url: str, episodes: List[EpisodeInfo]) -> None:
payload = []
for item in episodes:
payload.append(
{
"number": int(item.number),
"title": item.title,
"original_title": item.original_title,
"url": item.url,
}
)
if payload:
_session_cache_set(self._season_episodes_cache_name(season_url), payload)
def capabilities(self) -> set[str]:
return {"popular_series", "genres", "latest_episodes"}
@@ -633,6 +856,12 @@ class AniworldPlugin(BasisPlugin):
wanted = title.casefold().strip()
cached_url = self._title_url_cache.get(wanted, "")
if cached_url:
result = SeriesResult(title=title, description="", url=cached_url)
self._anime_results[title] = result
return result
for candidate in self._anime_results.values():
if candidate.title and candidate.title.casefold().strip() == wanted:
return candidate
@@ -640,7 +869,7 @@ class AniworldPlugin(BasisPlugin):
try:
for entry in self._ensure_popular():
if entry.title and entry.title.casefold().strip() == wanted:
self._anime_results[entry.title] = entry
self._remember_anime_result(entry.title, entry.url, entry.description)
return entry
except Exception:
pass
@@ -649,7 +878,7 @@ class AniworldPlugin(BasisPlugin):
for entries in self._ensure_genres().values():
for entry in entries:
if entry.title and entry.title.casefold().strip() == wanted:
self._anime_results[entry.title] = entry
self._remember_anime_result(entry.title, entry.url, entry.description)
return entry
except Exception:
pass
@@ -657,7 +886,7 @@ class AniworldPlugin(BasisPlugin):
try:
for entry in search_animes(title):
if entry.title and entry.title.casefold().strip() == wanted:
self._anime_results[entry.title] = entry
self._remember_anime_result(entry.title, entry.url, entry.description)
return entry
except Exception:
pass
@@ -669,6 +898,7 @@ class AniworldPlugin(BasisPlugin):
return list(self._popular_cache)
soup = _get_soup_simple(_popular_animes_url())
results: List[SeriesResult] = []
cache_dirty = False
seen: set[str] = set()
for anchor in soup.select("div.seriesListContainer a[href^='/anime/stream/']"):
href = (anchor.get("href") or "").strip()
@@ -690,6 +920,9 @@ class AniworldPlugin(BasisPlugin):
continue
seen.add(key)
results.append(SeriesResult(title=title, description=description, url=url))
cache_dirty = self._remember_anime_result(title, url, description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
self._popular_cache = list(results)
return list(results)
@@ -697,7 +930,11 @@ class AniworldPlugin(BasisPlugin):
if not self._requests_available:
return []
entries = self._ensure_popular()
self._anime_results.update({entry.title: entry for entry in entries if entry.title})
cache_dirty = False
for entry in entries:
cache_dirty = self._remember_anime_result(entry.title, entry.url, entry.description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
return [entry.title for entry in entries if entry.title]
def latest_episodes(self, page: int = 1) -> List[LatestEpisode]:
@@ -727,6 +964,7 @@ class AniworldPlugin(BasisPlugin):
return {key: list(value) for key, value in self._genre_cache.items()}
soup = _get_soup_simple(_genres_url())
results: Dict[str, List[SeriesResult]] = {}
cache_dirty = False
genre_blocks = soup.select("#seriesContainer div.genre")
if not genre_blocks:
genre_blocks = soup.select("div.genre")
@@ -752,9 +990,14 @@ class AniworldPlugin(BasisPlugin):
continue
seen.add(key)
entries.append(SeriesResult(title=title, description="", url=url))
cache_dirty = self._remember_anime_result(title, url, persist=False) or cache_dirty
if entries:
results[genre_name] = entries
if cache_dirty:
self._save_title_url_cache()
self._genre_cache = {key: list(value) for key, value in results.items()}
self._genre_names_cache = sorted(self._genre_cache.keys(), key=str.casefold)
_session_cache_set("genres", self._genre_names_cache)
# Für spätere Auflösung (Seasons/Episoden) die Titel->URL Zuordnung auffüllen.
for entries in results.values():
for entry in entries:
@@ -764,11 +1007,31 @@ class AniworldPlugin(BasisPlugin):
self._anime_results[entry.title] = entry
return {key: list(value) for key, value in results.items()}
def _ensure_genre_names(self) -> List[str]:
if self._genre_names_cache is not None:
return list(self._genre_names_cache)
cached = _session_cache_get("genres")
if isinstance(cached, list):
names = [str(value).strip() for value in cached if str(value).strip()]
if names:
self._genre_names_cache = sorted(set(names), key=str.casefold)
return list(self._genre_names_cache)
try:
body = _get_html_simple(_genres_url())
names = _extract_genre_names_from_html(body)
except Exception:
names = []
if not names:
mapping = self._ensure_genres()
names = list(mapping.keys())
self._genre_names_cache = sorted({name for name in names if name}, key=str.casefold)
_session_cache_set("genres", self._genre_names_cache)
return list(self._genre_names_cache)
def genres(self) -> List[str]:
if not self._requests_available:
return []
genres = list(self._ensure_genres().keys())
return [g for g in genres if g]
return self._ensure_genre_names()
def titles_for_genre(self, genre: str) -> List[str]:
genre = (genre or "").strip()
@@ -785,7 +1048,11 @@ class AniworldPlugin(BasisPlugin):
if not entries:
return []
# Zusätzlich sicherstellen, dass die Titel im Cache sind.
self._anime_results.update({entry.title: entry for entry in entries if entry.title and entry.title not in self._anime_results})
cache_dirty = False
for entry in entries:
cache_dirty = self._remember_anime_result(entry.title, entry.url, entry.description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
return [entry.title for entry in entries if entry.title]
def _season_label(self, number: int) -> str:
@@ -810,7 +1077,7 @@ class AniworldPlugin(BasisPlugin):
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._anime_results[title] = SeriesResult(title=title, description="", url=series_url)
self._remember_anime_result(title, series_url)
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
@@ -820,6 +1087,9 @@ class AniworldPlugin(BasisPlugin):
if direct and direct.url:
return direct.url
wanted = title.casefold().strip()
cached_url = self._title_url_cache.get(wanted, "")
if cached_url:
return cached_url
for candidate in self._anime_results.values():
if candidate.title and candidate.title.casefold().strip() == wanted and candidate.url:
return candidate.url
@@ -832,8 +1102,13 @@ class AniworldPlugin(BasisPlugin):
anime = self._find_series_by_title(title)
if not anime:
return []
session_links = self._load_session_season_links(anime.url)
if session_links:
self._season_links_cache[title] = list(session_links)
return list(session_links)
seasons = scrape_anime_detail(anime.url, load_episodes=False)
self._season_links_cache[title] = list(seasons)
self._save_session_season_links(anime.url, seasons)
return list(seasons)
def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]:
@@ -845,12 +1120,21 @@ class AniworldPlugin(BasisPlugin):
target = next((season for season in links if season.number == season_number), None)
if not target:
return None
cached_episodes = self._load_session_season_episodes(target.url)
if cached_episodes:
season_info = SeasonInfo(number=target.number, url=target.url, episodes=list(cached_episodes))
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
return season_info
season_soup = _get_soup(target.url, session=get_requests_session("aniworld", headers=HEADERS))
season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup))
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
self._save_session_season_episodes(target.url, season_info.episodes)
return season_info
def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]:
@@ -885,7 +1169,12 @@ class AniworldPlugin(BasisPlugin):
self._season_cache.clear()
self._episode_label_cache.clear()
raise RuntimeError(f"AniWorld-Suche fehlgeschlagen: {exc}") from exc
self._anime_results = {result.title: result for result in results}
self._anime_results = {}
cache_dirty = False
for result in results:
cache_dirty = self._remember_anime_result(result.title, result.url, result.description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear()

View File

@@ -0,0 +1,476 @@
"""Doku-Streams (doku-streams.com) Integration."""
from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "doku_streams_base_url"
DEFAULT_BASE_URL = "https://doku-streams.com"
MOST_VIEWED_PATH = "/meistgesehene/"
DEFAULT_TIMEOUT = 20
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_dokustreams"
SETTING_DUMP_HTML = "dump_html_dokustreams"
SETTING_SHOW_URL_INFO = "show_url_info_dokustreams"
SETTING_LOG_ERRORS = "log_errors_dokustreams"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass(frozen=True)
class SearchHit:
title: str
url: str
plot: str = ""
poster: str = ""
def _extract_last_page(soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("nav.navigation a[href], nav.pagination a[href], a.page-numbers[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"/page/(\\d+)/", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
for value in re.findall(r"(\\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def _extract_summary_and_poster(article: BeautifulSoupT) -> tuple[str, str]:
summary = ""
if article:
summary_box = article.select_one("div.entry-summary")
if summary_box is not None:
for p in summary_box.find_all("p"):
text = (p.get_text(" ", strip=True) or "").strip()
if text:
summary = text
break
poster = ""
if article:
img = article.select_one("div.entry-thumb img")
if img is not None:
poster = (img.get("data-src") or "").strip() or (img.get("src") or "").strip()
if "lazy_placeholder" in poster and img.get("data-src"):
poster = (img.get("data-src") or "").strip()
poster = _absolute_url(poster)
return summary, poster
def _parse_listing_hits(soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
for article in soup.select("article[id^='post-']"):
anchor = article.select_one("h2.entry-title a[href]")
if anchor is None:
continue
href = (anchor.get("href") or "").strip()
title = (anchor.get_text(" ", strip=True) or "").strip()
if not href or not title:
continue
if query and not _matches_query(query, title=title):
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
summary, poster = _extract_summary_and_poster(article)
hits.append(SearchHit(title=title, url=url, plot=summary, poster=poster))
return hits
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _absolute_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if url.startswith("http://") or url.startswith("https://"):
return url
if url.startswith("//"):
return f"https:{url}"
if url.startswith("/"):
return f"{_get_base_url()}{url}"
return f"{_get_base_url()}/{url.lstrip('/')}"
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="dokustreams_urls.log",
url=url,
kind=kind,
)
def _log_visit(url: str) -> None:
_log_url_event(url, kind="VISIT")
notify_url(
ADDON_ID,
heading="Doku-Streams",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="dokustreams_response",
)
def _log_error_message(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="dokustreams_errors.log",
message=message,
)
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
_log_visit(url)
sess = session or get_requests_session("dokustreams", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error_message(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
return BeautifulSoup(response.text, "html.parser")
class DokuStreamsPlugin(BasisPlugin):
name = "Doku-Streams"
version = "1.0.0"
prefer_source_metadata = True
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._category_to_url: Dict[str, str] = {}
self._category_page_count_cache: Dict[str, int] = {}
self._popular_cache: Optional[List[SearchHit]] = None
self._title_meta: Dict[str, tuple[str, str]] = {}
self._requests_available = REQUESTS_AVAILABLE
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"DokuStreamsPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
self._title_to_url = {hit.title: hit.url for hit in hits if hit.title and hit.url}
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def _search_hits(self, query: str) -> List[SearchHit]:
query = (query or "").strip()
if not query or not self._requests_available:
return []
search_url = _absolute_url(f"/?s={quote(query)}")
session = get_requests_session("dokustreams", headers=HEADERS)
try:
soup = _get_soup(search_url, session=session)
except Exception:
return []
return _parse_listing_hits(soup, query=query)
def capabilities(self) -> set[str]:
return {"genres", "popular_series"}
def _categories_url(self) -> str:
return _absolute_url("/kategorien/")
def _parse_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
def clean_name(value: str) -> str:
value = (value or "").strip()
return re.sub(r"\\s*\\(\\d+\\)\\s*$", "", value).strip()
def walk(ul, parents: List[str]) -> None:
for li in ul.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = clean_name(anchor.get_text(" ", strip=True) or "")
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
child_ul = li.find("ul", class_="nested-category-list")
if child_ul is not None:
walk(child_ul, parents + [name])
else:
if parents:
label = " \u2192 ".join(parents + [name])
categories[label] = _absolute_url(href)
walk(root, [])
return categories
def _parse_top_categories(self, soup: BeautifulSoupT) -> Dict[str, str]:
categories: Dict[str, str] = {}
if not soup:
return categories
root = soup.select_one("ul.nested-category-list")
if root is None:
return categories
for li in root.find_all("li", recursive=False):
anchor = li.find("a", href=True)
if anchor is None:
continue
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
categories[name] = _absolute_url(href)
return categories
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._category_to_url:
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_categories(soup)
if parsed:
self._category_to_url = dict(parsed)
return sorted(self._category_to_url.keys(), key=lambda value: value.casefold())
def categories(self) -> List[str]:
if not self._requests_available:
return []
try:
soup = _get_soup(self._categories_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
parsed = self._parse_top_categories(soup)
if parsed:
for key, value in parsed.items():
self._category_to_url.setdefault(key, value)
return list(parsed.keys())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._category_page_count_cache:
return max(1, int(self._category_page_count_cache.get(genre, 1)))
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return 1
pages = _extract_last_page(soup)
self._category_page_count_cache[genre] = max(1, pages)
return self._category_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._category_to_url:
self.genres()
base_url = self._category_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else f"{base_url.rstrip('/')}/page/{page}/"
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
return titles
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _most_viewed_url(self) -> str:
return _absolute_url(MOST_VIEWED_PATH)
def popular_series(self) -> List[str]:
if not self._requests_available:
return []
if self._popular_cache is not None:
titles = [hit.title for hit in self._popular_cache if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
try:
soup = _get_soup(self._most_viewed_url(), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
self._popular_cache = list(hits)
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
titles = [hit.title for hit in hits if hit.title]
titles.sort(key=lambda value: value.casefold())
return titles
def metadata_for(self, title: str) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
plot, poster = self._title_meta.get(title, ("", ""))
info: dict[str, str] = {"title": title}
if plot:
info["plot"] = plot
art: dict[str, str] = {}
if poster:
art = {"thumb": poster, "poster": poster}
return info, art, None
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return ["Stream"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title or title not in self._title_to_url:
return []
return [title]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
if not title:
return None
url = self._title_to_url.get(title)
if not url:
return None
if not self._requests_available:
return None
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return None
iframe = soup.select_one("div.fluid-width-video-wrapper iframe[src]")
if iframe is None:
iframe = soup.select_one("iframe[src*='youtube'], iframe[src*='vimeo'], iframe[src]")
if iframe is None:
return None
src = (iframe.get("src") or "").strip()
if not src:
return None
return _absolute_url(src)
# Alias für die automatische Plugin-Erkennung.
Plugin = DokuStreamsPlugin

View File

@@ -507,6 +507,7 @@ class EinschaltenPlugin(BasisPlugin):
"""Metadata-Plugin für eine autorisierte Quelle."""
name = "Einschalten"
version = "1.0.0"
def __init__(self) -> None:
self.is_available = REQUESTS_AVAILABLE
@@ -1078,3 +1079,7 @@ class EinschaltenPlugin(BasisPlugin):
return []
# Backwards compatible: first page only. UI uses paging via `new_titles_page`.
return self.new_titles_page(1)
# Alias für die automatische Plugin-Erkennung.
Plugin = EinschaltenPlugin

View File

@@ -0,0 +1,928 @@
"""Filmpalast Integration (movie-style provider).
Hinweis:
- Der Parser ist bewusst defensiv und arbeitet mit mehreren Fallback-Selektoren,
da Filmpalast-Layouts je Domain variieren koennen.
"""
from __future__ import annotations
from dataclasses import dataclass
import re
from urllib.parse import quote, urlencode
from urllib.parse import urljoin
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "filmpalast_base_url"
DEFAULT_BASE_URL = "https://filmpalast.to"
DEFAULT_TIMEOUT = 20
DEFAULT_PREFERRED_HOSTERS = ["voe", "vidoza", "streamtape", "doodstream", "mixdrop"]
SERIES_HINT_PREFIX = "series://filmpalast/"
SERIES_VIEW_PATH = "/serien/view"
SEASON_EPISODE_RE = re.compile(r"\bS\s*(\d{1,2})\s*E\s*(\d{1,3})\b", re.IGNORECASE)
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_filmpalast"
SETTING_DUMP_HTML = "dump_html_filmpalast"
SETTING_SHOW_URL_INFO = "show_url_info_filmpalast"
SETTING_LOG_ERRORS = "log_errors_filmpalast"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
@dataclass(frozen=True)
class SearchHit:
title: str
url: str
@dataclass(frozen=True)
class EpisodeEntry:
season: int
episode: int
suffix: str
url: str
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _absolute_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if url.startswith("http://") or url.startswith("https://"):
return url
if url.startswith("//"):
return f"https:{url}"
if url.startswith("/"):
return f"{_get_base_url()}{url}"
return f"{_get_base_url()}/{url.lstrip('/')}"
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _is_probably_content_url(url: str) -> bool:
lower = (url or "").casefold()
if not lower:
return False
block_markers = (
"/genre/",
"/kategorie/",
"/category/",
"/tag/",
"/login",
"/register",
"/kontakt",
"/impressum",
"/datenschutz",
"/dmca",
"/agb",
"javascript:",
"#",
)
if any(marker in lower for marker in block_markers):
return False
allow_markers = ("/stream/", "/film/", "/movie/", "/serien/", "/serie/", "/title/")
return any(marker in lower for marker in allow_markers)
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="filmpalast_urls.log",
url=url,
kind=kind,
)
def _log_visit(url: str) -> None:
_log_url_event(url, kind="VISIT")
notify_url(
ADDON_ID,
heading="Filmpalast",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="filmpalast_response",
)
def _log_error_message(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="filmpalast_errors.log",
message=message,
)
def _is_series_hint_url(value: str) -> bool:
return (value or "").startswith(SERIES_HINT_PREFIX)
def _series_hint_value(title: str) -> str:
safe_title = quote((title or "").strip(), safe="")
return f"{SERIES_HINT_PREFIX}{safe_title}" if safe_title else SERIES_HINT_PREFIX
def _extract_number(value: str) -> Optional[int]:
match = re.search(r"(\d+)", value or "")
if not match:
return None
try:
return int(match.group(1))
except Exception:
return None
def _strip_series_alias(title: str) -> str:
return re.sub(r"\s*\(serie\)\s*$", "", title or "", flags=re.IGNORECASE).strip()
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
_log_visit(url)
sess = session or get_requests_session("filmpalast", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error_message(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
return BeautifulSoup(response.text, "html.parser")
class FilmpalastPlugin(BasisPlugin):
name = "Filmpalast"
version = "1.0.0"
def __init__(self) -> None:
self._title_to_url: Dict[str, str] = {}
self._series_entries: Dict[str, Dict[int, Dict[int, EpisodeEntry]]] = {}
self._hoster_cache: Dict[str, Dict[str, str]] = {}
self._genre_to_url: Dict[str, str] = {}
self._genre_page_count_cache: Dict[str, int] = {}
self._alpha_to_url: Dict[str, str] = {}
self._alpha_page_count_cache: Dict[str, int] = {}
self._series_page_count_cache: Dict[int, int] = {}
self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = (
"requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
)
if REQUESTS_IMPORT_ERROR:
print(f"FilmpalastPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
def _lookup_title_url(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._title_to_url.get(title)
if direct:
return direct
wanted = title.casefold()
for key, value in self._title_to_url.items():
if key.casefold() == wanted and value:
return value
return ""
def _series_key_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
if title in self._series_entries:
return title
wanted = title.casefold()
for key in self._series_entries.keys():
if key.casefold() == wanted:
return key
return ""
def _has_series_entries(self, title: str) -> bool:
return bool(self._series_key_for_title(title))
def _episode_entry_from_hit(self, hit: SearchHit) -> Optional[Tuple[str, EpisodeEntry]]:
title = (hit.title or "").strip()
if not title:
return None
marker = SEASON_EPISODE_RE.search(title)
if not marker:
return None
try:
season_number = int(marker.group(1))
episode_number = int(marker.group(2))
except Exception:
return None
series_title = re.sub(r"\s+", " ", title[: marker.start()] or "").strip(" -|:;,_")
if not series_title:
return None
suffix = re.sub(r"\s+", " ", title[marker.end() :] or "").strip(" -|:;,_")
entry = EpisodeEntry(season=season_number, episode=episode_number, suffix=suffix, url=hit.url)
return (series_title, entry)
def _add_series_entry(self, series_title: str, entry: EpisodeEntry) -> None:
if not series_title or not entry.url:
return
seasons = self._series_entries.setdefault(series_title, {})
episodes = seasons.setdefault(entry.season, {})
if entry.episode not in episodes:
episodes[entry.episode] = entry
def _ensure_series_entries_for_title(self, title: str) -> str:
series_key = self._series_key_for_title(title)
if series_key:
return series_key
original_title = (title or "").strip()
lookup_title = _strip_series_alias(original_title)
if not lookup_title:
return ""
if not self._requests_available:
return ""
wanted = _normalize_search_text(lookup_title)
hits = self._search_hits(lookup_title)
for hit in hits:
parsed = self._episode_entry_from_hit(hit)
if not parsed:
continue
series_title, entry = parsed
if wanted and _normalize_search_text(series_title) != wanted:
continue
self._add_series_entry(series_title, entry)
self._title_to_url.setdefault(series_title, _series_hint_value(series_title))
resolved = self._series_key_for_title(original_title) or self._series_key_for_title(lookup_title)
if resolved and original_title and original_title != resolved:
self._series_entries[original_title] = self._series_entries[resolved]
self._title_to_url.setdefault(original_title, _series_hint_value(resolved))
return original_title
return resolved
def _detail_url_for_selection(self, title: str, season: str, episode: str) -> str:
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
if series_key:
season_number = _extract_number(season)
episode_number = _extract_number(episode)
if season_number is None or episode_number is None:
return ""
entry = self._series_entries.get(series_key, {}).get(season_number, {}).get(episode_number)
return entry.url if entry else ""
return self._ensure_title_url(title)
def _search_hits(self, query: str) -> List[SearchHit]:
query = (query or "").strip()
if not query:
return []
if not self._requests_available or requests is None:
return []
session = get_requests_session("filmpalast", headers=HEADERS)
search_requests = [(_absolute_url(f"/search/title/{quote(query)}"), None)]
hits: List[SearchHit] = []
seen_titles: set[str] = set()
seen_urls: set[str] = set()
for base_url, params in search_requests:
try:
request_url = base_url if not params else f"{base_url}?{urlencode(params)}"
_log_url_event(request_url, kind="GET")
_log_visit(request_url)
response = session.get(base_url, params=params, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
if response.url and response.url != request_url:
_log_url_event(response.url, kind="REDIRECT")
_log_response_html(request_url, response.text)
soup = BeautifulSoup(response.text, "html.parser")
except Exception as exc:
_log_error_message(f"search request failed ({base_url}): {exc}")
continue
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
if not anchors:
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if not _is_probably_content_url(url):
continue
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
title = (title or "").strip()
if not title:
continue
if title.casefold() in {"details/play", "play", "details"}:
continue
if not _matches_query(query, title=title):
continue
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
hits.append(SearchHit(title=title, url=url))
if hits:
break
return hits
def _parse_listing_hits(self, soup: BeautifulSoupT, *, query: str = "") -> List[SearchHit]:
hits: List[SearchHit] = []
if not soup:
return hits
seen_titles: set[str] = set()
seen_urls: set[str] = set()
anchors = soup.select("article.liste h2 a[href], article.liste h3 a[href]")
if not anchors:
anchors = soup.select("a[href*='/stream/'][title], a[href*='/stream/']")
for anchor in anchors:
href = (anchor.get("href") or "").strip()
if not href:
continue
url = _absolute_url(href).split("#", 1)[0].split("?", 1)[0].rstrip("/")
if not _is_probably_content_url(url):
continue
title = (anchor.get("title") or anchor.get_text(" ", strip=True)).strip()
if not title:
continue
if title.casefold() in {"details/play", "play", "details"}:
continue
if query and not _matches_query(query, title=title):
continue
title_key = title.casefold()
url_key = url.casefold()
if title_key in seen_titles or url_key in seen_urls:
continue
seen_titles.add(title_key)
seen_urls.add(url_key)
_log_url_event(url, kind="PARSE")
hits.append(SearchHit(title=title, url=url))
return hits
def _apply_hits_to_title_index(self, hits: List[SearchHit]) -> List[str]:
self._title_to_url = {}
self._series_entries = {}
self._hoster_cache.clear()
movie_titles: List[str] = []
series_titles_seen: set[str] = set()
for hit in hits:
parsed = self._episode_entry_from_hit(hit)
if parsed:
series_title, entry = parsed
self._add_series_entry(series_title, entry)
if series_title.casefold() not in series_titles_seen:
self._title_to_url[series_title] = _series_hint_value(series_title)
series_titles_seen.add(series_title.casefold())
continue
title = (hit.title or "").strip()
if not title:
continue
movie_titles.append(title)
self._title_to_url[title] = hit.url
titles: List[str] = list(movie_titles)
movie_keys = {entry.casefold() for entry in movie_titles}
for series_title in sorted(self._series_entries.keys(), key=lambda value: value.casefold()):
if series_title.casefold() in movie_keys:
alias = f"{series_title} (Serie)"
self._title_to_url[alias] = self._title_to_url.get(series_title, _series_hint_value(series_title))
self._series_entries[alias] = self._series_entries[series_title]
titles.append(alias)
else:
titles.append(series_title)
titles.sort(key=lambda value: value.casefold())
return titles
async def search_titles(self, query: str) -> List[str]:
hits = self._search_hits(query)
return self._apply_hits_to_title_index(hits)
def _parse_genres(self, soup: BeautifulSoupT) -> Dict[str, str]:
genres: Dict[str, str] = {}
if not soup:
return genres
for anchor in soup.select("section#genre a[href], #genre a[href], aside #genre a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/genre/" not in href:
continue
genres[name] = _absolute_url(href)
return genres
def _extract_last_page(self, soup: BeautifulSoupT) -> int:
max_page = 1
if not soup:
return max_page
for anchor in soup.select("#paging a[href], .paging a[href], a.pageing[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"(\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
return max_page
def capabilities(self) -> set[str]:
return {"genres", "alpha", "series_catalog"}
def _parse_alpha_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
alpha: Dict[str, str] = {}
if not soup:
return alpha
for anchor in soup.select("section#movietitle a[href], #movietitle a[href], aside #movietitle a[href]"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not name or not href:
continue
if "/search/alpha/" not in href:
continue
if name in alpha:
continue
alpha[name] = _absolute_url(href)
return alpha
def alpha_index(self) -> List[str]:
if not self._requests_available:
return []
if self._alpha_to_url:
return list(self._alpha_to_url.keys())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_alpha_links(soup)
if parsed:
self._alpha_to_url = dict(parsed)
return list(self._alpha_to_url.keys())
def alpha_page_count(self, letter: str) -> int:
letter = (letter or "").strip()
if not letter:
return 1
if letter in self._alpha_page_count_cache:
return max(1, int(self._alpha_page_count_cache.get(letter, 1)))
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._alpha_page_count_cache[letter] = max(1, pages)
return self._alpha_page_count_cache[letter]
def titles_for_alpha_page(self, letter: str, page: int) -> List[str]:
letter = (letter or "").strip()
if not letter or not self._requests_available:
return []
if not self._alpha_to_url:
self.alpha_index()
base_url = self._alpha_to_url.get(letter, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_alpha(self, letter: str) -> List[str]:
titles = self.titles_for_alpha_page(letter, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _series_view_url(self) -> str:
return _absolute_url(SERIES_VIEW_PATH)
def series_catalog_page_count(self, page: int = 1) -> int:
if not self._requests_available:
return 1
cache_key = int(page or 1)
if cache_key in self._series_page_count_cache:
return max(1, int(self._series_page_count_cache.get(cache_key, 1)))
base_url = self._series_view_url()
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._series_page_count_cache[cache_key] = max(1, pages)
return self._series_page_count_cache[cache_key]
def series_catalog_page(self, page: int) -> List[str]:
if not self._requests_available:
return []
base_url = self._series_view_url()
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def series_catalog_has_more(self, page: int) -> bool:
total = self.series_catalog_page_count(page)
return page < total
def genres(self) -> List[str]:
if not self._requests_available:
return []
if self._genre_to_url:
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
parsed = self._parse_genres(soup)
if parsed:
self._genre_to_url = dict(parsed)
return sorted(self._genre_to_url.keys(), key=lambda value: value.casefold())
def genre_page_count(self, genre: str) -> int:
genre = (genre or "").strip()
if not genre:
return 1
if genre in self._genre_page_count_cache:
return max(1, int(self._genre_page_count_cache.get(genre, 1)))
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return 1
try:
soup = _get_soup(base_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return 1
pages = self._extract_last_page(soup)
self._genre_page_count_cache[genre] = max(1, pages)
return self._genre_page_count_cache[genre]
def titles_for_genre_page(self, genre: str, page: int) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
if not self._genre_to_url:
self.genres()
base_url = self._genre_to_url.get(genre, "")
if not base_url:
return []
page = max(1, int(page or 1))
url = base_url if page == 1 else urljoin(base_url.rstrip("/") + "/", f"page/{page}")
try:
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return []
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
def titles_for_genre(self, genre: str) -> List[str]:
titles = self.titles_for_genre_page(genre, 1)
titles.sort(key=lambda value: value.casefold())
return titles
def _ensure_title_url(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._lookup_title_url(title)
if direct and _is_series_hint_url(direct):
return ""
if direct:
self._title_to_url[title] = direct
return direct
if self._has_series_entries(title) or self._ensure_series_entries_for_title(title):
self._title_to_url[title] = _series_hint_value(title)
return ""
wanted = title.casefold()
hits = self._search_hits(title)
for hit in hits:
if self._episode_entry_from_hit(hit):
continue
if hit.title.casefold() == wanted and hit.url:
self._title_to_url[title] = hit.url
return hit.url
return ""
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._title_to_url[title] = series_url
self._hoster_cache.clear()
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._lookup_title_url(title)
if direct:
return direct
series_key = self._series_key_for_title(title)
if series_key:
return _series_hint_value(series_key)
return ""
def is_movie(self, title: str) -> bool:
title = (title or "").strip()
if not title:
return False
direct = self._lookup_title_url(title)
if direct:
return not _is_series_hint_url(direct)
if SEASON_EPISODE_RE.search(title):
return False
if self._has_series_entries(title):
return False
if self._ensure_series_entries_for_title(title):
return False
return True
@staticmethod
def _normalize_hoster_name(name: str) -> str:
name = (name or "").strip()
if not name:
return ""
name = re.sub(r"\s+", " ", name)
return name
def _extract_hoster_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
hosters: Dict[str, str] = {}
if not soup:
return hosters
# Primäres Layout: jeder Hoster in eigener UL mit hostName + Play-Link.
for block in soup.select("ul.currentStreamLinks"):
host_name_node = block.select_one("li.hostBg .hostName")
host_name = self._normalize_hoster_name(host_name_node.get_text(" ", strip=True) if host_name_node else "")
play_anchor = block.select_one("li.streamPlayBtn a[href], a.button.iconPlay[href]")
href = (play_anchor.get("href") if play_anchor else "") or ""
play_url = _absolute_url(href).strip()
if not play_url:
continue
if not host_name:
host_name = self._normalize_hoster_name(play_anchor.get_text(" ", strip=True) if play_anchor else "")
if not host_name:
host_name = "Unbekannt"
if host_name not in hosters:
hosters[host_name] = play_url
# Fallback: direkte Play-Buttons im Stream-Bereich.
if not hosters:
for anchor in soup.select("#grap-stream-list a.button.iconPlay[href], .streamLinksWrapper a.button.iconPlay[href]"):
href = (anchor.get("href") or "").strip()
play_url = _absolute_url(href).strip()
if not play_url:
continue
text_name = self._normalize_hoster_name(anchor.get_text(" ", strip=True))
host_name = text_name if text_name and text_name.casefold() not in {"play", "details play"} else "Unbekannt"
if host_name in hosters:
host_name = f"{host_name} #{len(hosters) + 1}"
hosters[host_name] = play_url
return hosters
def _hosters_for_detail_url(self, detail_url: str) -> Dict[str, str]:
detail_url = (detail_url or "").strip()
if not detail_url:
return {}
cached = self._hoster_cache.get(detail_url)
if cached is not None:
return dict(cached)
if not self._requests_available:
return {}
try:
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return {}
hosters = self._extract_hoster_links(soup)
for url in hosters.values():
_log_url_event(url, kind="PARSE")
self._hoster_cache[detail_url] = dict(hosters)
return dict(hosters)
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
if series_key:
seasons = sorted(self._series_entries.get(series_key, {}).keys())
return [f"Staffel {number}" for number in seasons]
detail_url = self._ensure_title_url(title)
return ["Film"] if detail_url else []
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
series_key = self._series_key_for_title(title) or self._ensure_series_entries_for_title(title)
if series_key:
season_number = _extract_number(season)
if season_number is None:
return []
episodes = self._series_entries.get(series_key, {}).get(season_number, {})
labels: List[str] = []
for episode_number in sorted(episodes.keys()):
entry = episodes[episode_number]
label = f"Episode {episode_number}"
if entry.suffix:
label = f"{label} - {entry.suffix}"
labels.append(label)
return labels
return ["Stream"] if self._ensure_title_url(title) else []
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
hosters = self._hosters_for_detail_url(detail_url)
return list(hosters.keys())
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
detail_url = self._detail_url_for_selection(title, season, episode)
if not detail_url:
return None
hosters = self._hosters_for_detail_url(detail_url)
if hosters:
for preferred in self._preferred_hosters:
preferred_key = (preferred or "").strip().casefold()
if not preferred_key:
continue
for host_name, host_url in hosters.items():
if preferred_key in host_name.casefold() or preferred_key in host_url.casefold():
_log_url_event(host_url, kind="FOUND")
return host_url
first = next(iter(hosters.values()))
_log_url_event(first, kind="FOUND")
return first
if not self._requests_available:
return detail_url
try:
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
except Exception:
return detail_url
candidates: List[str] = []
for iframe in soup.select("iframe[src]"):
src = (iframe.get("src") or "").strip()
if src:
candidates.append(_absolute_url(src))
for anchor in soup.select("a[href]"):
href = (anchor.get("href") or "").strip()
if not href:
continue
lower = href.casefold()
if "watch" in lower or "stream" in lower or "player" in lower:
candidates.append(_absolute_url(href))
deduped: List[str] = []
seen: set[str] = set()
for candidate in candidates:
key = candidate.casefold()
if key in seen:
continue
seen.add(key)
deduped.append(candidate)
if deduped:
_log_url_event(deduped[0], kind="FOUND")
return deduped[0]
return detail_url
def set_preferred_hosters(self, hosters: List[str]) -> None:
normalized = [str(hoster).strip().lower() for hoster in hosters if str(hoster).strip()]
if normalized:
self._preferred_hosters = normalized
def reset_preferred_hosters(self) -> None:
self._preferred_hosters = list(self._default_preferred_hosters)
def resolve_stream_link(self, link: str) -> Optional[str]:
if not link:
return None
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
# 1) Immer zuerst den ursprünglichen Hoster-Link an ResolveURL geben.
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(link)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
redirected = link
if self._requests_available:
try:
session = get_requests_session("filmpalast", headers=HEADERS)
response = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
response.raise_for_status()
redirected = (response.url or link).strip() or link
except Exception:
redirected = link
# 2) Danach optional die Redirect-URL nochmals auflösen.
if callable(resolve_with_resolveurl) and redirected and redirected != link:
resolved_by_resolveurl = resolve_with_resolveurl(redirected)
if resolved_by_resolveurl:
_log_url_event("ResolveURL", kind="HOSTER_RESOLVER")
_log_url_event(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
# 3) Fallback bleibt wie bisher: direkte URL zurückgeben.
if redirected:
_log_url_event(redirected, kind="FINAL")
return redirected
return None
# Alias für die automatische Plugin-Erkennung.
Plugin = FilmpalastPlugin

View File

@@ -238,6 +238,14 @@ def _normalize_search_text(value: str) -> str:
return value
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _is_episode_tba(title: str, original_title: str) -> bool:
combined = f"{title} {original_title}".casefold()
markers = ("tba", "demnächst", "demnaechst", "coming soon", "to be announced")
@@ -395,8 +403,7 @@ def _extract_genre_names_from_html(body: str) -> List[str]:
def search_series(query: str) -> List[SeriesResult]:
"""Sucht Serien im (/serien)-Katalog (Genre-liste) nach Titel/Alt-Titel."""
_ensure_requests()
normalized_query = _normalize_search_text(query)
if not normalized_query:
if not _normalize_search_text(query):
return []
# Direkter Abruf wie in fetch_serien.py.
catalog_url = f"{_get_base_url()}/serien?by=genre"
@@ -404,8 +411,7 @@ def search_series(query: str) -> List[SeriesResult]:
results: List[SeriesResult] = []
for series in parse_series_catalog(soup).values():
for entry in series:
haystack = _normalize_search_text(entry.title)
if entry.title and normalized_query in haystack:
if entry.title and _matches_query(query, title=entry.title):
results.append(entry)
return results
@@ -778,6 +784,7 @@ class SerienstreamPlugin(BasisPlugin):
"""Downloader-Plugin, das Serien von s.to ueber requests/bs4 bereitstellt."""
name = "Serienstream"
version = "1.0.0"
POPULAR_GENRE_LABEL = "⭐ Beliebte Serien"
def __init__(self) -> None:

View File

@@ -57,7 +57,7 @@ else: # pragma: no cover
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "topstream_base_url"
DEFAULT_BASE_URL = "https://www.meineseite"
DEFAULT_BASE_URL = "https://topstreamfilm.live"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
@@ -106,10 +106,8 @@ def _matches_query(query: str, *, title: str, description: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = _normalize_search_text(title)
if not haystack:
return False
return normalized_query in haystack
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _strip_der_film_suffix(title: str) -> str:
@@ -125,6 +123,7 @@ class TopstreamfilmPlugin(BasisPlugin):
"""Integration fuer eine HTML-basierte Suchseite."""
name = "Topstreamfilm"
version = "1.0.0"
def __init__(self) -> None:
self._session: RequestsSession | None = None

View File

@@ -24,6 +24,10 @@
<setting id="dump_html_einschalten" type="bool" label="Einschalten: HTML-Dumps" default="false" />
<setting id="show_url_info_einschalten" type="bool" label="Einschalten: URL-Info anzeigen" default="false" />
<setting id="log_errors_einschalten" type="bool" label="Einschalten: Fehler loggen" default="false" />
<setting id="log_urls_filmpalast" type="bool" label="Filmpalast: URL-Logging" default="false" />
<setting id="dump_html_filmpalast" type="bool" label="Filmpalast: HTML-Dumps" default="false" />
<setting id="show_url_info_filmpalast" type="bool" label="Filmpalast: URL-Info anzeigen" default="false" />
<setting id="log_errors_filmpalast" type="bool" label="Filmpalast: Fehler loggen" default="false" />
</category>
<category label="TopStream">
<setting id="topstream_base_url" type="text" label="Domain (BASE_URL)" default="https://topstreamfilm.live" />
@@ -38,6 +42,12 @@
<category label="Einschalten">
<setting id="einschalten_base_url" type="text" label="Domain (BASE_URL)" default="https://einschalten.in" />
</category>
<category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Domain (BASE_URL)" default="https://filmpalast.to" />
</category>
<category label="Doku-Streams">
<setting id="doku_streams_base_url" type="text" label="Domain (BASE_URL)" default="https://doku-streams.com" />
</category>
<category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />
<setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" />
@@ -54,4 +64,16 @@
<setting id="tmdb_log_requests" type="bool" label="TMDB API Requests loggen" default="false" />
<setting id="tmdb_log_responses" type="bool" label="TMDB API Antworten loggen" default="false" />
</category>
<category label="Update">
<setting id="update_repo_url" type="text" label="Update-URL (addons.xml)" default="http://127.0.0.1:8080/repo/addons.xml" />
<setting id="run_update_check" type="action" label="Jetzt auf Updates pruefen" action="RunPlugin(plugin://plugin.video.viewit/?action=check_updates)" option="close" />
<setting id="update_info" type="text" label="Kodi-Repository-Updates werden ueber den Kodi-Update-Mechanismus verarbeitet." default="" enable="false" />
<setting id="update_version_addon" type="text" label="ViewIT Addon Version" default="-" enable="false" />
<setting id="update_version_serienstream" type="text" label="Serienstream Plugin Version" default="-" enable="false" />
<setting id="update_version_aniworld" type="text" label="Aniworld Plugin Version" default="-" enable="false" />
<setting id="update_version_einschalten" type="text" label="Einschalten Plugin Version" default="-" enable="false" />
<setting id="update_version_topstreamfilm" type="text" label="Topstreamfilm Plugin Version" default="-" enable="false" />
<setting id="update_version_filmpalast" type="text" label="Filmpalast Plugin Version" default="-" enable="false" />
<setting id="update_version_doku_streams" type="text" label="Doku-Streams Plugin Version" default="-" enable="false" />
</category>
</settings>

View File

@@ -10,7 +10,7 @@ Dieses Dokument beschreibt den Einstiegspunkt des Addons und die zentrale Steuer
- startet die Wiedergabe und verwaltet Playstate/Resume.
## Ablauf (high level)
1. **PluginDiscovery**: Lädt alle `addon/plugins/*.py` (ohne `_`Prefix) und instanziiert Klassen, die von `BasisPlugin` erben.
1. **PluginDiscovery**: Lädt alle `addon/plugins/*.py` (ohne `_`Prefix). Bevorzugt `Plugin = <Klasse>`, sonst werden `BasisPlugin`Subklassen deterministisch instanziiert.
2. **Navigation**: Baut KodiListen (Serien/Staffeln/Episoden) auf Basis der PluginAntworten.
3. **Playback**: Holt StreamLinks aus dem Plugin und startet die Wiedergabe.
4. **Playstate**: Speichert ResumeDaten lokal (`playstate.json`) und setzt `playcount`/ResumeInfos.
@@ -35,6 +35,7 @@ Die genaue Aktion wird aus den QueryParametern gelesen und an das entsprechen
- **PluginLoader**: findet & instanziiert Plugins.
- **UIHelper**: setzt ContentType, baut Verzeichnisseinträge.
- **PlaystateHelper**: `_load_playstate`, `_save_playstate`, `_apply_playstate_to_info`.
- **MetadataMerge**: PluginMetadaten können TMDB übersteuern, TMDB dient als Fallback.
## Fehlerbehandlung
- PluginImportfehler werden isoliert behandelt, damit das Addon nicht komplett ausfällt.

View File

@@ -6,6 +6,7 @@ Diese Doku beschreibt, wie Plugins im ViewITAddon aufgebaut sind und wie neue
- Jedes Plugin ist eine einzelne Datei unter `addon/plugins/`.
- Dateinamen **ohne** `_`Prefix werden automatisch geladen.
- Jede Datei enthält eine Klasse, die von `BasisPlugin` erbt.
- Optional: `Plugin = <Klasse>` als expliziter Einstiegspunkt (bevorzugt vom Loader).
## PflichtMethoden (BasisPlugin)
Jedes Plugin muss diese Methoden implementieren:
@@ -13,11 +14,32 @@ Jedes Plugin muss diese Methoden implementieren:
- `seasons_for(title: str) -> list[str]`
- `episodes_for(title: str, season: str) -> list[str]`
## Vertrag Plugin ↔ Hauptlogik (`default.py`)
Die Hauptlogik ruft Plugin-Methoden auf und verarbeitet ausschließlich deren Rückgaben.
Wesentliche Rückgaben an die Hauptlogik:
- `search_titles(...)` → Liste von Titel-Strings für die Trefferliste
- `seasons_for(...)` → Liste von Staffel-Labels
- `episodes_for(...)` → Liste von Episoden-Labels
- `stream_link_for(...)` → Hoster-/Player-Link (nicht zwingend finale Media-URL)
- `resolve_stream_link(...)` → finale/spielbare URL nach Redirect/Resolver
- `metadata_for(...)` → Info-Labels/Art (Plot/Poster) aus der Quelle
- Optional `available_hosters_for(...)` → auswählbare Hoster-Namen im Dialog
- Optional `series_url_for_title(...)` → stabile Detail-URL pro Titel für Folgeaufrufe
- Optional `remember_series_url(...)` → Übernahme einer bereits bekannten Detail-URL
Standard für Film-Provider (ohne echte Staffeln):
- `seasons_for(title)` gibt `["Film"]` zurück
- `episodes_for(title, "Film")` gibt `["Stream"]` zurück
## Optionale Features (Capabilities)
Über `capabilities()` kann das Plugin zusätzliche Funktionen anbieten:
- `popular_series``popular_series()`
- `genres``genres()` + `titles_for_genre(genre)`
- `latest_episodes``latest_episodes(page=1)`
- `new_titles``new_titles_page(page=1)`
- `alpha``alpha_index()` + `titles_for_alpha_page(letter, page)`
- `series_catalog``series_catalog_page(page=1)`
## Empfohlene Struktur
- Konstanten für URLs/Endpoints (BASE_URL, Pfade, Templates)
@@ -27,7 +49,8 @@ Jedes Plugin muss diese Methoden implementieren:
## Suche (aktuelle Policy)
- **Nur TitelMatches**
- **SubstringMatch** nach Normalisierung (Lowercase + NichtAlnum → Leerzeichen)
- **Wortbasierter Match** nach Normalisierung (Lowercase + NichtAlnum → Leerzeichen)
- Keine Teilwort-Treffer innerhalb eines Wortes (Beispiel: `hund` matcht nicht `thunder`)
- Keine Beschreibung/Plot/Meta für Matches
## Namensgebung
@@ -41,10 +64,25 @@ Standard: `*_base_url` (Domain / BASE_URL)
- `aniworld_base_url`
- `einschalten_base_url`
- `topstream_base_url`
- `filmpalast_base_url`
- `doku_streams_base_url`
## Playback
- Wenn möglich `stream_link_for(...)` implementieren.
- Optional `available_hosters_for(...)`/`resolve_stream_link(...)` für HosterAuflösung.
- `stream_link_for(...)` implementieren (liefert bevorzugten Hoster-Link).
- `available_hosters_for(...)` bereitstellen, wenn die Seite mehrere Hoster anbietet.
- `resolve_stream_link(...)` nach einheitlichem Flow umsetzen:
1. Redirects auflösen (falls vorhanden)
2. ResolveURL (`resolveurl_backend.resolve`) versuchen
3. Bei Fehlschlag auf den besten verfügbaren Link zurückfallen
- Optional `set_preferred_hosters(...)` unterstützen, damit die Hoster-Auswahl aus der Hauptlogik direkt greift.
## StandardFlow (empfohlen)
1. **Suche**: nur Titel liefern und Titel→Detail-URL mappen.
2. **Navigation**: `series_url_for_title`/`remember_series_url` unterstützen, damit URLs zwischen Aufrufen stabil bleiben.
3. **Auswahl Hoster**: Hoster-Namen aus der Detailseite extrahieren und anbieten.
4. **Playback**: Hoster-Link liefern, danach konsistent über `resolve_stream_link` finalisieren.
5. **Metadaten**: `metadata_for` nutzen, Plot/Poster aus der Quelle zurückgeben.
6. **Fallbacks**: bei Layout-Unterschieden defensiv parsen und Logging aktivierbar halten.
## Debugging
Global gesteuert über Settings:
@@ -63,11 +101,16 @@ Plugins sollten die Helper aus `addon/plugin_helpers.py` nutzen:
## Build & Test
- ZIP bauen: `./scripts/build_kodi_zip.sh`
- AddonOrdner: `./scripts/build_install_addon.sh`
- PluginManifest aktualisieren: `python3 scripts/generate_plugin_manifest.py`
- Live-Snapshot-Checks: `python3 qa/run_plugin_snapshots.py` (aktualisieren mit `--update`)
## BeispielCheckliste
- [ ] `name` korrekt gesetzt
- [ ] `*_base_url` in Settings vorhanden
- [ ] Suche matcht nur Titel
- [ ] Suche matcht nur Titel und wortbasiert
- [ ] `stream_link_for` + `resolve_stream_link` folgen dem Standard-Flow
- [ ] Optional: `available_hosters_for` + `set_preferred_hosters` vorhanden
- [ ] Optional: `series_url_for_title` + `remember_series_url` vorhanden
- [ ] Fehlerbehandlung und Timeouts vorhanden
- [ ] Optional: Caches für Performance

104
docs/PLUGIN_MANIFEST.json Normal file
View File

@@ -0,0 +1,104 @@
{
"schema_version": 1,
"plugins": [
{
"file": "addon/plugins/aniworld_plugin.py",
"module": "aniworld_plugin",
"name": "Aniworld",
"class": "AniworldPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"latest_episodes",
"popular_series"
],
"prefer_source_metadata": false,
"base_url_setting": "aniworld_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/dokustreams_plugin.py",
"module": "dokustreams_plugin",
"name": "Doku-Streams",
"class": "DokuStreamsPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"popular_series"
],
"prefer_source_metadata": true,
"base_url_setting": "doku_streams_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/einschalten_plugin.py",
"module": "einschalten_plugin",
"name": "Einschalten",
"class": "EinschaltenPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"new_titles"
],
"prefer_source_metadata": false,
"base_url_setting": "einschalten_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/filmpalast_plugin.py",
"module": "filmpalast_plugin",
"name": "Filmpalast",
"class": "FilmpalastPlugin",
"version": "1.0.0",
"capabilities": [
"alpha",
"genres",
"series_catalog"
],
"prefer_source_metadata": false,
"base_url_setting": "filmpalast_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/serienstream_plugin.py",
"module": "serienstream_plugin",
"name": "Serienstream",
"class": "SerienstreamPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"latest_episodes",
"popular_series"
],
"prefer_source_metadata": false,
"base_url_setting": "serienstream_base_url",
"available": true,
"unavailable_reason": null,
"error": null
},
{
"file": "addon/plugins/topstreamfilm_plugin.py",
"module": "topstreamfilm_plugin",
"name": "Topstreamfilm",
"class": "TopstreamfilmPlugin",
"version": "1.0.0",
"capabilities": [
"genres",
"popular_series"
],
"prefer_source_metadata": false,
"base_url_setting": "topstream_base_url",
"available": true,
"unavailable_reason": null,
"error": null
}
]
}

View File

@@ -9,6 +9,7 @@ ViewIt lädt Provider-Integrationen dynamisch aus `addon/plugins/*.py`. Jede Dat
Weitere Details:
- `docs/DEFAULT_ROUTER.md` (Hauptlogik in `addon/default.py`)
- `docs/PLUGIN_DEVELOPMENT.md` (Entwicklerdoku für Plugins)
- `docs/PLUGIN_MANIFEST.json` (zentraler Überblick über Plugins, Versionen, Capabilities)
### Aktuelle Plugins
@@ -16,6 +17,8 @@ Weitere Details:
- `topstreamfilm_plugin.py` Topstreamfilm
- `einschalten_plugin.py` Einschalten
- `aniworld_plugin.py` Aniworld
- `filmpalast_plugin.py` Filmpalast
- `dokustreams_plugin.py` Doku-Streams
- `_template_plugin.py` Vorlage für neue Plugins
### Plugin-Discovery (Ladeprozess)
@@ -25,11 +28,16 @@ Der Loader in `addon/default.py`:
1. Sucht alle `*.py` in `addon/plugins/`
2. Überspringt Dateien, die mit `_` beginnen
3. Lädt Module dynamisch
4. Instanziert Klassen, die von `BasisPlugin` erben
5. Ignoriert Plugins mit `is_available = False`
4. Nutzt `Plugin = <Klasse>` als bevorzugten Einstiegspunkt (falls vorhanden)
5. Fallback: instanziert Klassen, die von `BasisPlugin` erben (deterministisch sortiert)
6. Ignoriert Plugins mit `is_available = False`
Damit bleiben fehlerhafte Plugins isoliert und blockieren nicht das gesamte Add-on.
### Plugin-Manifest (Audit & Repro)
`docs/PLUGIN_MANIFEST.json` listet alle Plugins mit Version, Capabilities und Basis-Settings.
Erzeugung: `python3 scripts/generate_plugin_manifest.py`
### BasisPlugin verpflichtende Methoden
Definiert in `addon/plugin_interface.py`:
@@ -37,19 +45,29 @@ Definiert in `addon/plugin_interface.py`:
- `async search_titles(query: str) -> list[str]`
- `seasons_for(title: str) -> list[str]`
- `episodes_for(title: str, season: str) -> list[str]`
- optional `metadata_for(title: str) -> (info_labels, art, cast)`
### Optionale Features (Capabilities)
Plugins können zusätzliche Features anbieten:
- `capabilities() -> set[str]`
- `popular_series`: liefert beliebte Serien
- `genres`: Genre-Liste verfügbar
- `latest_episodes`: neue Episoden verfügbar
- `popular_series`: liefert beliebte Serien
- `genres`: Genre-Liste verfügbar
- `latest_episodes`: neue Episoden verfügbar
- `new_titles`: neue Titel verfügbar
- `alpha`: A-Z Index verfügbar
- `series_catalog`: Serienkatalog verfügbar
- `popular_series() -> list[str]`
- `genres() -> list[str]`
- `titles_for_genre(genre: str) -> list[str]`
- `latest_episodes(page: int = 1) -> list[LatestEpisode]` (wenn angeboten)
- `new_titles_page(page: int = 1) -> list[str]` (wenn angeboten)
- `alpha_index() -> list[str]` (wenn angeboten)
- `series_catalog_page(page: int = 1) -> list[str]` (wenn angeboten)
Metadaten:
- `prefer_source_metadata = True` bedeutet: Plugin-Metadaten gehen vor TMDB, TMDB dient nur als Fallback.
ViewIt zeigt im UI nur die Features an, die ein Plugin tatsächlich liefert.
@@ -61,6 +79,7 @@ Eine Integration sollte typischerweise bieten:
- `search_titles()` mit Provider-Suche
- `seasons_for()` und `episodes_for()` mit HTML-Parsing
- `stream_link_for()` optional für direkte Playback-Links
- `metadata_for()` optional für Plot/Poster aus der Quelle
- Optional: `available_hosters_for()` oder Provider-spezifische Helfer
Als Startpunkt dient `addon/plugins/_template_plugin.py`.
@@ -78,8 +97,9 @@ Als Startpunkt dient `addon/plugins/_template_plugin.py`.
- Keine Netzwerkzugriffe im Import-Top-Level
- Netzwerkzugriffe nur in Methoden (z.B. `search_titles`)
- Fehler sauber abfangen und verständliche Fehlermeldungen liefern
- Kein globaler Zustand, der across instances überrascht
- Kein globaler Zustand, der über Instanzen hinweg überrascht
- Provider-spezifische Parser in Helper-Funktionen kapseln
- Reproduzierbare Reihenfolge: `Plugin`-Alias nutzen oder Klassenname eindeutig halten
### Debugging & Logs

20
pyproject.toml Normal file
View File

@@ -0,0 +1,20 @@
[tool.pytest.ini_options]
addopts = "-q --cov=addon --cov-report=term-missing"
python_files = ["test_*.py"]
norecursedirs = ["scripts"]
markers = [
"live: real HTTP requests (set LIVE_TESTS=1 to run)",
"perf: performance benchmarks",
]
[tool.coverage.run]
source = ["addon"]
branch = true
omit = [
"*/__pycache__/*",
"addon/resources/*",
]
[tool.coverage.report]
show_missing = true
skip_empty = true

52
qa/plugin_snapshots.json Normal file
View File

@@ -0,0 +1,52 @@
{
"snapshots": {
"Serienstream::search_titles::trek": [
"Star Trek: Lower Decks",
"Star Trek: Prodigy",
"Star Trek: The Animated Series",
"Inside Star Trek",
"Raumschiff Enterprise - Star Trek: The Original Series",
"Star Trek: Deep Space Nine",
"Star Trek: Discovery",
"Star Trek: Enterprise",
"Star Trek: Picard",
"Star Trek: Raumschiff Voyager",
"Star Trek: Short Treks",
"Star Trek: Starfleet Academy",
"Star Trek: Strange New Worlds",
"Star Trek: The Next Generation"
],
"Aniworld::search_titles::naruto": [
"Naruto",
"Naruto Shippuden",
"Boruto: Naruto Next Generations",
"Naruto Spin-Off: Rock Lee &amp; His Ninja Pals"
],
"Topstreamfilm::search_titles::matrix": [
"Darkdrive Verschollen in der Matrix",
"Matrix Reloaded",
"Armitage III: Poly Matrix",
"Matrix Resurrections",
"Matrix",
"Matrix Revolutions",
"Matrix Fighters"
],
"Einschalten::new_titles_page::1": [],
"Filmpalast::search_titles::trek": [
"Star Trek",
"Star Trek - Der Film",
"Star Trek 2 - Der Zorn des Khan",
"Star Trek 9 Der Aufstand",
"Star Trek: Nemesis",
"Star Trek: Section 31",
"Star Trek: Starfleet Academy",
"Star Trek: Strange New Worlds"
],
"Doku-Streams::search_titles::japan": [
"Deutsche im Knast - Japan und die Disziplin",
"Die Meerfrauen von Japan",
"Japan - Land der Moderne und Tradition",
"Japan im Zweiten Weltkrieg - Der Fall des Kaiserreichs"
]
}
}

153
qa/run_plugin_snapshots.py Executable file
View File

@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""Run live snapshot checks for plugins.
Use --update to refresh stored snapshots.
"""
from __future__ import annotations
import argparse
import asyncio
import importlib.util
import inspect
import json
import sys
from pathlib import Path
from typing import Any
ROOT_DIR = Path(__file__).resolve().parents[1]
PLUGIN_DIR = ROOT_DIR / "addon" / "plugins"
SNAPSHOT_PATH = ROOT_DIR / "qa" / "plugin_snapshots.json"
sys.path.insert(0, str(ROOT_DIR / "addon"))
try:
from plugin_interface import BasisPlugin # type: ignore
except Exception as exc: # pragma: no cover
raise SystemExit(f"Failed to import BasisPlugin: {exc}")
CONFIG = [
{"plugin": "Serienstream", "method": "search_titles", "args": ["trek"], "max_items": 20},
{"plugin": "Aniworld", "method": "search_titles", "args": ["naruto"], "max_items": 20},
{"plugin": "Topstreamfilm", "method": "search_titles", "args": ["matrix"], "max_items": 20},
{"plugin": "Einschalten", "method": "new_titles_page", "args": [1], "max_items": 20},
{"plugin": "Filmpalast", "method": "search_titles", "args": ["trek"], "max_items": 20},
{"plugin": "Doku-Streams", "method": "search_titles", "args": ["japan"], "max_items": 20},
]
def _import_module(path: Path):
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
raise ImportError(f"Missing spec for {path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def _discover_plugins() -> dict[str, BasisPlugin]:
plugins: dict[str, BasisPlugin] = {}
for file_path in sorted(PLUGIN_DIR.glob("*.py")):
if file_path.name.startswith("_"):
continue
module = _import_module(file_path)
preferred = getattr(module, "Plugin", None)
if inspect.isclass(preferred) and issubclass(preferred, BasisPlugin) and preferred is not BasisPlugin:
classes = [preferred]
else:
classes = [
obj
for obj in module.__dict__.values()
if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin
]
classes.sort(key=lambda cls: cls.__name__.casefold())
for cls in classes:
instance = cls()
name = str(getattr(instance, "name", "") or "").strip()
if name and name not in plugins:
plugins[name] = instance
return plugins
def _normalize_titles(value: Any, max_items: int) -> list[str]:
if not value:
return []
titles = [str(item).strip() for item in list(value) if item and str(item).strip()]
seen = set()
normalized: list[str] = []
for title in titles:
key = title.casefold()
if key in seen:
continue
seen.add(key)
normalized.append(title)
if len(normalized) >= max_items:
break
return normalized
def _snapshot_key(entry: dict[str, Any]) -> str:
args = entry.get("args", [])
return f"{entry['plugin']}::{entry['method']}::{','.join(str(a) for a in args)}"
def _call_method(plugin: BasisPlugin, method_name: str, args: list[Any]):
method = getattr(plugin, method_name, None)
if not callable(method):
raise RuntimeError(f"Method missing: {method_name}")
result = method(*args)
if asyncio.iscoroutine(result):
return asyncio.run(result)
return result
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--update", action="store_true")
args = parser.parse_args()
snapshots: dict[str, Any] = {}
if SNAPSHOT_PATH.exists():
snapshots = json.loads(SNAPSHOT_PATH.read_text(encoding="utf-8"))
data = snapshots.get("snapshots", {}) if isinstance(snapshots, dict) else {}
if args.update:
data = {}
plugins = _discover_plugins()
errors = []
for entry in CONFIG:
plugin_name = entry["plugin"]
plugin = plugins.get(plugin_name)
if plugin is None:
errors.append(f"Plugin missing: {plugin_name}")
continue
key = _snapshot_key(entry)
try:
result = _call_method(plugin, entry["method"], entry.get("args", []))
normalized = _normalize_titles(result, entry.get("max_items", 20))
except Exception as exc:
errors.append(f"Snapshot error: {key} ({exc})")
if args.update:
data[key] = {"error": str(exc)}
continue
if args.update:
data[key] = normalized
else:
expected = data.get(key)
if expected != normalized:
errors.append(f"Snapshot mismatch: {key}\nExpected: {expected}\nActual: {normalized}")
if args.update:
SNAPSHOT_PATH.parent.mkdir(parents=True, exist_ok=True)
SNAPSHOT_PATH.write_text(json.dumps({"snapshots": data}, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
if errors:
for err in errors:
print(err)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="repository.viewit" name="ViewIT Repository" version="1.0.0" provider-name="ViewIT">
<extension point="xbmc.addon.repository" name="ViewIT Repository">
<dir>
<info compressed="false">http://127.0.0.1:8080/repo/addons.xml</info>
<checksum>http://127.0.0.1:8080/repo/addons.xml.md5</checksum>
<datadir zip="true">http://127.0.0.1:8080/repo/</datadir>
</dir>
</extension>
<extension point="xbmc.addon.metadata">
<summary lang="de_DE">Lokales Repository fuer ViewIT Updates</summary>
<summary lang="en_GB">Local repository for ViewIT updates</summary>
<description lang="de_DE">Stellt das ViewIT Addon ueber ein Kodi Repository bereit.</description>
<description lang="en_GB">Provides the ViewIT addon via a Kodi repository.</description>
<platform>all</platform>
</extension>
</addon>

2
requirements-dev.txt Normal file
View File

@@ -0,0 +1,2 @@
pytest>=9,<10
pytest-cov>=5,<8

View File

@@ -37,6 +37,6 @@ ZIP_PATH="${INSTALL_DIR}/${ZIP_NAME}"
ADDON_DIR="$("${ROOT_DIR}/scripts/build_install_addon.sh" >/dev/null; echo "${INSTALL_DIR}/${ADDON_ID}")"
rm -f "${ZIP_PATH}"
(cd "${INSTALL_DIR}" && zip -r "${ZIP_NAME}" "$(basename "${ADDON_DIR}")" >/dev/null)
python3 "${ROOT_DIR}/scripts/zip_deterministic.py" "${ZIP_PATH}" "${ADDON_DIR}" >/dev/null
echo "${ZIP_PATH}"

110
scripts/build_local_kodi_repo.sh Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
REPO_DIR="${DIST_DIR}/repo"
PLUGIN_ADDON_XML="${ROOT_DIR}/addon/addon.xml"
REPO_SRC_DIR="${ROOT_DIR}/repository.viewit"
REPO_ADDON_XML="${REPO_SRC_DIR}/addon.xml"
REPO_BASE_URL="${REPO_BASE_URL:-http://127.0.0.1:8080/repo}"
if [[ ! -f "${PLUGIN_ADDON_XML}" ]]; then
echo "Missing: ${PLUGIN_ADDON_XML}" >&2
exit 1
fi
if [[ ! -f "${REPO_ADDON_XML}" ]]; then
echo "Missing: ${REPO_ADDON_XML}" >&2
exit 1
fi
mkdir -p "${REPO_DIR}"
PLUGIN_ZIP="$("${ROOT_DIR}/scripts/build_kodi_zip.sh")"
cp -f "${PLUGIN_ZIP}" "${REPO_DIR}/"
read -r REPO_ADDON_ID REPO_ADDON_VERSION < <(python3 - "${REPO_ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "repository.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
TMP_DIR="$(mktemp -d)"
trap 'rm -rf "${TMP_DIR}"' EXIT
TMP_REPO_ADDON_DIR="${TMP_DIR}/${REPO_ADDON_ID}"
mkdir -p "${TMP_REPO_ADDON_DIR}"
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete "${REPO_SRC_DIR}/" "${TMP_REPO_ADDON_DIR}/"
else
cp -a "${REPO_SRC_DIR}/." "${TMP_REPO_ADDON_DIR}/"
fi
python3 - "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_BASE_URL}" <<'PY'
import sys
import xml.etree.ElementTree as ET
addon_xml = sys.argv[1]
base_url = sys.argv[2].rstrip("/")
tree = ET.parse(addon_xml)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
raise SystemExit("Invalid repository addon.xml: missing <dir>")
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
raise SystemExit("Invalid repository addon.xml: missing info/checksum/datadir")
info.text = f"{base_url}/addons.xml"
checksum.text = f"{base_url}/addons.xml.md5"
datadir.text = f"{base_url}/"
tree.write(addon_xml, encoding="utf-8", xml_declaration=True)
PY
REPO_ZIP_NAME="${REPO_ADDON_ID}-${REPO_ADDON_VERSION}.zip"
REPO_ZIP_PATH="${REPO_DIR}/${REPO_ZIP_NAME}"
rm -f "${REPO_ZIP_PATH}"
python3 "${ROOT_DIR}/scripts/zip_deterministic.py" "${REPO_ZIP_PATH}" "${TMP_REPO_ADDON_DIR}" >/dev/null
python3 - "${PLUGIN_ADDON_XML}" "${TMP_REPO_ADDON_DIR}/addon.xml" "${REPO_DIR}/addons.xml" <<'PY'
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
plugin_xml = Path(sys.argv[1])
repo_xml = Path(sys.argv[2])
target = Path(sys.argv[3])
addons = ET.Element("addons")
for source in (plugin_xml, repo_xml):
root = ET.parse(source).getroot()
addons.append(root)
target.write_text('<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(addons, encoding="unicode"), encoding="utf-8")
PY
python3 - "${REPO_DIR}/addons.xml" "${REPO_DIR}/addons.xml.md5" <<'PY'
import hashlib
import sys
from pathlib import Path
addons_xml = Path(sys.argv[1])
md5_file = Path(sys.argv[2])
md5 = hashlib.md5(addons_xml.read_bytes()).hexdigest()
md5_file.write_text(md5, encoding="ascii")
PY
echo "Repo built:"
echo " ${REPO_DIR}/addons.xml"
echo " ${REPO_DIR}/addons.xml.md5"
echo " ${REPO_ZIP_PATH}"
echo " ${REPO_DIR}/$(basename "${PLUGIN_ZIP}")"

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""Generate a JSON manifest for addon plugins."""
from __future__ import annotations
import importlib.util
import inspect
import json
import sys
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[1]
PLUGIN_DIR = ROOT_DIR / "addon" / "plugins"
OUTPUT_PATH = ROOT_DIR / "docs" / "PLUGIN_MANIFEST.json"
sys.path.insert(0, str(ROOT_DIR / "addon"))
try:
from plugin_interface import BasisPlugin # type: ignore
except Exception as exc: # pragma: no cover
raise SystemExit(f"Failed to import BasisPlugin: {exc}")
def _import_module(path: Path):
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
raise ImportError(f"Missing spec for {path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def _collect_plugins():
plugins = []
for file_path in sorted(PLUGIN_DIR.glob("*.py")):
if file_path.name.startswith("_"):
continue
entry = {
"file": str(file_path.relative_to(ROOT_DIR)),
"module": file_path.stem,
"name": None,
"class": None,
"version": None,
"capabilities": [],
"prefer_source_metadata": False,
"base_url_setting": None,
"available": None,
"unavailable_reason": None,
"error": None,
}
try:
module = _import_module(file_path)
preferred = getattr(module, "Plugin", None)
if inspect.isclass(preferred) and issubclass(preferred, BasisPlugin) and preferred is not BasisPlugin:
classes = [preferred]
else:
classes = [
obj
for obj in module.__dict__.values()
if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin
]
classes.sort(key=lambda cls: cls.__name__.casefold())
if not classes:
entry["error"] = "No plugin classes found"
plugins.append(entry)
continue
cls = classes[0]
instance = cls()
entry["class"] = cls.__name__
entry["name"] = str(getattr(instance, "name", "") or "") or None
entry["version"] = str(getattr(instance, "version", "0.0.0") or "0.0.0")
entry["prefer_source_metadata"] = bool(getattr(instance, "prefer_source_metadata", False))
entry["available"] = bool(getattr(instance, "is_available", True))
entry["unavailable_reason"] = getattr(instance, "unavailable_reason", None)
try:
caps = instance.capabilities() # type: ignore[call-arg]
entry["capabilities"] = sorted([str(c) for c in caps]) if caps else []
except Exception:
entry["capabilities"] = []
entry["base_url_setting"] = getattr(module, "SETTING_BASE_URL", None)
except Exception as exc: # pragma: no cover
entry["error"] = str(exc)
plugins.append(entry)
plugins.sort(key=lambda item: (item.get("name") or item["module"]).casefold())
return plugins
def main() -> int:
if not PLUGIN_DIR.exists():
raise SystemExit("Plugin directory missing")
manifest = {
"schema_version": 1,
"plugins": _collect_plugins(),
}
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
print(str(OUTPUT_PATH))
return 0
if __name__ == "__main__":
raise SystemExit(main())

193
scripts/publish_gitea_release.sh Executable file
View File

@@ -0,0 +1,193 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
ADDON_XML="${ROOT_DIR}/addon/addon.xml"
DEFAULT_NOTES="Automatischer Release-Upload aus ViewIT Build."
TAG=""
ASSET_PATH=""
TITLE=""
NOTES="${DEFAULT_NOTES}"
DRY_RUN="0"
while [[ $# -gt 0 ]]; do
case "$1" in
--tag)
TAG="${2:-}"
shift 2
;;
--asset)
ASSET_PATH="${2:-}"
shift 2
;;
--title)
TITLE="${2:-}"
shift 2
;;
--notes)
NOTES="${2:-}"
shift 2
;;
--dry-run)
DRY_RUN="1"
shift
;;
*)
echo "Unbekanntes Argument: $1" >&2
exit 1
;;
esac
done
if [[ ! -f "${ADDON_XML}" ]]; then
echo "Missing: ${ADDON_XML}" >&2
exit 1
fi
read -r ADDON_ID ADDON_VERSION < <(python3 - "${ADDON_XML}" <<'PY'
import sys
import xml.etree.ElementTree as ET
root = ET.parse(sys.argv[1]).getroot()
print(root.attrib.get("id", "plugin.video.viewit"), root.attrib.get("version", "0.0.0"))
PY
)
if [[ -z "${TAG}" ]]; then
TAG="v${ADDON_VERSION}"
fi
if [[ -z "${ASSET_PATH}" ]]; then
ASSET_PATH="${ROOT_DIR}/dist/${ADDON_ID}-${ADDON_VERSION}.zip"
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset nicht gefunden, baue ZIP: ${ASSET_PATH}"
"${ROOT_DIR}/scripts/build_kodi_zip.sh" >/dev/null
fi
if [[ ! -f "${ASSET_PATH}" ]]; then
echo "Asset fehlt nach Build: ${ASSET_PATH}" >&2
exit 1
fi
if [[ -z "${TITLE}" ]]; then
TITLE="ViewIT ${TAG}"
fi
REMOTE_URL="$(git -C "${ROOT_DIR}" remote get-url origin)"
read -r BASE_URL OWNER REPO < <(python3 - "${REMOTE_URL}" <<'PY'
import re
import sys
u = sys.argv[1].strip()
m = re.match(r"^https?://([^/]+)/([^/]+)/([^/.]+)(?:\.git)?/?$", u)
if not m:
raise SystemExit("Origin-URL muss https://host/owner/repo(.git) sein.")
host, owner, repo = m.group(1), m.group(2), m.group(3)
print(f"https://{host}", owner, repo)
PY
)
API_BASE="${BASE_URL}/api/v1/repos/${OWNER}/${REPO}"
ASSET_NAME="$(basename "${ASSET_PATH}")"
if [[ "${DRY_RUN}" == "1" ]]; then
echo "[DRY-RUN] API: ${API_BASE}"
echo "[DRY-RUN] Tag: ${TAG}"
echo "[DRY-RUN] Asset: ${ASSET_PATH}"
exit 0
fi
if [[ -z "${GITEA_TOKEN:-}" ]]; then
echo "Bitte GITEA_TOKEN setzen." >&2
exit 1
fi
tmp_json="$(mktemp)"
tmp_http="$(mktemp)"
trap 'rm -f "${tmp_json}" "${tmp_http}"' EXIT
urlenc() {
python3 - "$1" <<'PY'
import sys
from urllib.parse import quote
print(quote(sys.argv[1], safe=""))
PY
}
tag_enc="$(urlenc "${TAG}")"
auth_header="Authorization: token ${GITEA_TOKEN}"
http_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/tags/${tag_enc}")"
if [[ "${http_code}" == "200" ]]; then
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
elif [[ "${http_code}" == "404" ]]; then
payload="$(python3 - "${TAG}" "${TITLE}" "${NOTES}" <<'PY'
import json,sys
print(json.dumps({
"tag_name": sys.argv[1],
"name": sys.argv[2],
"body": sys.argv[3],
"draft": False,
"prerelease": False
}))
PY
)"
http_code_create="$(curl -sS -X POST -H "${auth_header}" -H "Content-Type: application/json" -d "${payload}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases")"
if [[ "${http_code_create}" != "201" ]]; then
echo "Release konnte nicht erstellt werden (HTTP ${http_code_create})." >&2
cat "${tmp_json}" >&2
exit 1
fi
RELEASE_ID="$(python3 - "${tmp_json}" <<'PY'
import json,sys
print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
else
echo "Release-Abfrage fehlgeschlagen (HTTP ${http_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
assets_code="$(curl -sS -H "${auth_header}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets")"
if [[ "${assets_code}" == "200" ]]; then
EXISTING_ASSET_ID="$(python3 - "${tmp_json}" "${ASSET_NAME}" <<'PY'
import json,sys
assets=json.load(open(sys.argv[1], encoding="utf-8"))
name=sys.argv[2]
for a in assets:
if a.get("name")==name:
print(a.get("id"))
break
PY
)"
if [[ -n "${EXISTING_ASSET_ID}" ]]; then
del_code="$(curl -sS -X DELETE -H "${auth_header}" -o "${tmp_http}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets/${EXISTING_ASSET_ID}")"
if [[ "${del_code}" != "204" ]]; then
echo "Altes Asset konnte nicht geloescht werden (HTTP ${del_code})." >&2
cat "${tmp_http}" >&2
exit 1
fi
fi
fi
asset_name_enc="$(urlenc "${ASSET_NAME}")"
upload_code="$(curl -sS -X POST -H "${auth_header}" -F "attachment=@${ASSET_PATH}" -o "${tmp_json}" -w "%{http_code}" "${API_BASE}/releases/${RELEASE_ID}/assets?name=${asset_name_enc}")"
if [[ "${upload_code}" != "201" ]]; then
echo "Asset-Upload fehlgeschlagen (HTTP ${upload_code})." >&2
cat "${tmp_json}" >&2
exit 1
fi
echo "Release-Asset hochgeladen:"
echo " Repo: ${OWNER}/${REPO}"
echo " Tag: ${TAG}"
echo " Asset: ${ASSET_NAME}"
echo " URL: ${BASE_URL}/${OWNER}/${REPO}/releases/tag/${TAG}"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
DIST_DIR="${ROOT_DIR}/dist"
HOST="${HOST:-127.0.0.1}"
PORT="${PORT:-8080}"
if [[ ! -f "${DIST_DIR}/repo/addons.xml" ]]; then
echo "Missing ${DIST_DIR}/repo/addons.xml" >&2
echo "Run ./scripts/build_local_kodi_repo.sh first." >&2
exit 1
fi
echo "Serving local Kodi repo from ${DIST_DIR}"
echo "Repository URL: http://${HOST}:${PORT}/repo/addons.xml"
(cd "${DIST_DIR}" && python3 -m http.server "${PORT}" --bind "${HOST}")

73
scripts/zip_deterministic.py Executable file
View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""Create deterministic zip archives.
Usage:
zip_deterministic.py <zip_path> <root_dir>
The archive will include the root directory itself and all files under it.
"""
from __future__ import annotations
import os
import sys
import time
import zipfile
from pathlib import Path
def _timestamp() -> tuple[int, int, int, int, int, int]:
epoch = os.environ.get("SOURCE_DATE_EPOCH")
if epoch:
try:
value = int(epoch)
return time.gmtime(value)[:6]
except Exception:
pass
return (2000, 1, 1, 0, 0, 0)
def _iter_files(root: Path):
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = sorted([d for d in dirnames if d != "__pycache__"])
for filename in sorted(filenames):
if filename.endswith(".pyc"):
continue
yield Path(dirpath) / filename
def _add_file(zf: zipfile.ZipFile, file_path: Path, arcname: str) -> None:
info = zipfile.ZipInfo(arcname, date_time=_timestamp())
info.compress_type = zipfile.ZIP_DEFLATED
info.external_attr = (0o644 & 0xFFFF) << 16
with file_path.open("rb") as handle:
data = handle.read()
zf.writestr(info, data, compress_type=zipfile.ZIP_DEFLATED)
def main() -> int:
if len(sys.argv) != 3:
print("Usage: zip_deterministic.py <zip_path> <root_dir>")
return 2
zip_path = Path(sys.argv[1]).resolve()
root = Path(sys.argv[2]).resolve()
if not root.exists() or not root.is_dir():
print(f"Missing root dir: {root}")
return 2
base = root.parent
zip_path.parent.mkdir(parents=True, exist_ok=True)
if zip_path.exists():
zip_path.unlink()
with zipfile.ZipFile(zip_path, "w") as zf:
for file_path in sorted(_iter_files(root)):
arcname = str(file_path.relative_to(base)).replace(os.sep, "/")
_add_file(zf, file_path, arcname)
print(str(zip_path))
return 0
if __name__ == "__main__":
raise SystemExit(main())