Files
ViewIT/addon/plugins/kkiste_plugin.py

427 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""KKiste Plugin für ViewIT.
Nutzt die JSON-REST-API von kkiste.eu.
Filme und Serien mit TMDB-Thumbnails kein HTML-Scraping.
Serien-Besonderheit: Auf KKiste ist jede Staffel ein eigener Eintrag
(z.B. "Breaking Bad - Staffel 1"). Die Suche liefert alle passenden
Staffel-Einträge direkt.
"""
from __future__ import annotations
import re
from typing import Any, Callable, List, Optional
from urllib.parse import quote_plus
try: # pragma: no cover
import requests
except ImportError as exc: # pragma: no cover
requests = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
DOMAIN = "kkiste.eu"
BASE_URL = "https://" + DOMAIN
DEFAULT_TIMEOUT = 20
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Referer": BASE_URL + "/",
"Origin": BASE_URL,
}
# Sprache: 2=Deutsch, 3=Englisch, all=alle
_LANG = "2"
_THUMB_BASE = "https://image.tmdb.org/t/p/w300"
_URL_BROWSE = BASE_URL + "/data/browse/?lang={lang}&type={type}&order_by={order}&page={page}"
_URL_SEARCH = BASE_URL + "/data/browse/?lang={lang}&order_by=new&page=1&limit=0"
_URL_GENRE = BASE_URL + "/data/browse/?lang={lang}&type=movies&order_by=Trending&genre={genre}&page=1"
_URL_WATCH = BASE_URL + "/data/watch/?_id={id}"
GENRE_SLUGS: dict[str, str] = {
"Action": "Action",
"Animation": "Animation",
"Biographie": "Biographie",
"Dokumentation": "Dokumentation",
"Drama": "Drama",
"Familie": "Familie",
"Fantasy": "Fantasy",
"Horror": "Horror",
"Komödie": "Komödie",
"Krimi": "Krimi",
"Mystery": "Mystery",
"Romantik": "Romantik",
"Science-Fiction": "Sci-Fi",
"Thriller": "Thriller",
"Western": "Western",
}
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
# ---------------------------------------------------------------------------
# Plugin-Klasse
# ---------------------------------------------------------------------------
class KKistePlugin(BasisPlugin):
"""KKiste Integration für ViewIT (kkiste.eu).
Jede Staffel einer Serie ist auf KKiste ein eigenständiger API-Eintrag.
"""
name = "KKiste"
def __init__(self) -> None:
# title → watch-URL (/data/watch/?_id=X)
self._title_to_watch_url: dict[str, str] = {}
# title → (plot, poster, fanart)
self._title_meta: dict[str, tuple[str, str, str]] = {}
# title → True wenn "Staffel"/"Season" im Titel
self._is_series: dict[str, bool] = {}
# title → Staffelnummer (aus "Staffel N" extrahiert)
self._season_nr: dict[str, int] = {}
# bevorzugte Hoster für Hoster-Dialog
self._preferred_hosters: list[str] = []
# ------------------------------------------------------------------
# Verfügbarkeit
# ------------------------------------------------------------------
@property
def is_available(self) -> bool:
return REQUESTS_AVAILABLE
@property
def unavailable_reason(self) -> str:
if REQUESTS_AVAILABLE:
return ""
return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}"
# ------------------------------------------------------------------
# HTTP
# ------------------------------------------------------------------
def _get_session(self): # type: ignore[return]
from http_session_pool import get_requests_session
return get_requests_session("kkiste", headers=HEADERS)
def _get_json(self, url: str) -> dict | list | None:
session = self._get_session()
response = None
try:
response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return response.json()
except Exception:
return None
finally:
if response is not None:
try:
response.close()
except Exception:
pass
# ------------------------------------------------------------------
# Interne Hilfsmethoden
# ------------------------------------------------------------------
def _cache_entry(self, movie: dict) -> str:
"""Cached einen API-Eintrag und gibt den Titel zurück ('' = überspringen)."""
title = str(movie.get("title") or "").strip()
if not title or "_id" not in movie:
return ""
movie_id = str(movie["_id"])
self._title_to_watch_url[title] = _URL_WATCH.format(id=movie_id)
# Serie erkennen
is_series = "Staffel" in title or "Season" in title
self._is_series[title] = is_series
if is_series:
m = re.search(r"(?:Staffel|Season)\s*(\d+)", title, re.IGNORECASE)
if m:
self._season_nr[title] = int(m.group(1))
# Metadaten
poster = ""
for key in ("poster_path_season", "poster_path"):
if movie.get(key):
poster = _THUMB_BASE + str(movie[key])
break
fanart = _THUMB_BASE + str(movie["backdrop_path"]) if movie.get("backdrop_path") else ""
plot = str(movie.get("storyline") or movie.get("overview") or "")
self._title_meta[title] = (plot, poster, fanart)
return title
def _ensure_watch_url(self, title: str) -> str:
"""Gibt die Watch-URL zurück lädt bei leerem Cache alle Titel nach."""
url = self._title_to_watch_url.get(title, "")
if url:
return url
# Fallback: alle Titel laden und exact-match suchen
search_url = _URL_SEARCH.format(lang=_LANG)
data = self._get_json(search_url)
if isinstance(data, dict):
q_lower = title.lower()
for movie in (data.get("movies") or []):
if isinstance(movie, dict):
raw = str(movie.get("title") or "").strip()
if raw.lower() == q_lower:
self._cache_entry(movie)
return self._title_to_watch_url.get(title, "")
return ""
def _browse(self, content_type: str, order: str = "Trending") -> List[str]:
url = _URL_BROWSE.format(lang=_LANG, type=content_type, order=order, page=1)
data = self._get_json(url)
if not isinstance(data, dict):
return []
return [
t for movie in (data.get("movies") or [])
if isinstance(movie, dict) and (t := self._cache_entry(movie))
]
def _hosters_for(self, title: str, season: str, episode: str) -> dict[str, str]:
"""Gibt {Hoster-Name → URL} für Titel/Staffel/Episode zurück."""
watch_url = self._ensure_watch_url(title)
if not watch_url:
return {}
data = self._get_json(watch_url)
if not isinstance(data, dict):
return {}
streams = data.get("streams") or []
hosters: dict[str, str] = {}
seen: set[str] = set()
# Film vs Serie: relevante Streams filtern
if season == "Film":
target_streams = [s for s in streams if isinstance(s, dict)]
else:
m = re.search(r"\d+", episode or "")
ep_nr = int(m.group()) if m else None
if ep_nr is None:
return {}
target_streams = [
s for s in streams
if isinstance(s, dict) and s.get("e") == ep_nr
]
for stream in target_streams:
src = str(stream.get("stream") or "").strip()
if not src:
continue
# Hoster-Name aus der Stream-URL extrahieren (nicht aus "source" das ist die Aggregator-Quelle)
try:
from urllib.parse import urlparse
host = urlparse(src).hostname or "Hoster"
# Domain-Prefix entfernen (www.)
if host.startswith("www."):
host = host[4:]
except Exception:
host = "Hoster"
name = host
base_name = name
i = 2
while name in seen:
name = f"{base_name} {i}"
i += 1
seen.add(name)
hosters[name] = src
return hosters
# ------------------------------------------------------------------
# Pflicht-Methoden
# ------------------------------------------------------------------
async def search_titles(
self, query: str, progress_callback: ProgressCallback = None
) -> List[str]:
query = (query or "").strip()
if not query or not REQUESTS_AVAILABLE:
return []
# KKiste: limit=0 lädt alle Titel, client-seitige Filterung
url = _URL_SEARCH.format(lang=_LANG)
data = self._get_json(url)
if not isinstance(data, dict):
return []
q_lower = query.lower()
titles: list[str] = []
for movie in (data.get("movies") or []):
if not isinstance(movie, dict) or "_id" not in movie:
continue
raw_title = str(movie.get("title") or "").strip()
if not raw_title or q_lower not in raw_title.lower():
continue
t = self._cache_entry(movie)
if t:
titles.append(t)
return titles
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
is_series = self._is_series.get(title)
if is_series is None:
# Cache leer (neue Instanz) nachfüllen
self._ensure_watch_url(title)
is_series = self._is_series.get(title)
if is_series:
season_nr = self._season_nr.get(title, 1)
return [f"Staffel {season_nr}"]
return ["Film"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
if season == "Film":
return [title]
# Serie: Episodenliste aus /data/watch/ laden
watch_url = self._ensure_watch_url(title)
if not watch_url:
return []
data = self._get_json(watch_url)
if not isinstance(data, dict):
return []
episode_nrs: set[int] = set()
for stream in (data.get("streams") or []):
if not isinstance(stream, dict):
continue
e = stream.get("e")
if e is not None:
try:
episode_nrs.add(int(e))
except (ValueError, TypeError):
pass
if not episode_nrs:
return [title]
return [f"Episode {nr}" for nr in sorted(episode_nrs)]
# ------------------------------------------------------------------
# Stream
# ------------------------------------------------------------------
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
return list(self._hosters_for(title, season, episode).keys())
def set_preferred_hosters(self, hosters: List[str]) -> None:
self._preferred_hosters = [h for h in hosters if h]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
hosters = self._hosters_for(title, season, episode)
if not hosters:
return None
# Bevorzugten Hoster nutzen falls gesetzt
for preferred in self._preferred_hosters:
key = preferred.casefold()
for name, url in hosters.items():
if key in name.casefold() or key in url.casefold():
return url
# Fallback: erster Hoster
return next(iter(hosters.values()))
def resolve_stream_link(self, link: str) -> Optional[str]:
link = (link or "").strip()
if not link:
return None
try:
from plugin_helpers import resolve_via_resolveurl
return resolve_via_resolveurl(link, fallback_to_link=False)
except Exception:
return None
# ------------------------------------------------------------------
# Metadaten
# ------------------------------------------------------------------
def metadata_for(
self, title: str
) -> tuple[dict[str, str], dict[str, str], list | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
info: dict[str, str] = {"title": title}
art: dict[str, str] = {}
cached = self._title_meta.get(title)
if cached:
plot, poster, fanart = cached
if plot:
info["plot"] = plot
if poster:
art["thumb"] = poster
art["poster"] = poster
if fanart:
art["fanart"] = fanart
art["landscape"] = fanart
return info, art, None
# ------------------------------------------------------------------
# Browsing
# ------------------------------------------------------------------
def new_titles(self) -> List[str]:
return self._browse("movies", "new")
def new_titles_page(self, page: int = 1) -> List[str]:
page = max(1, int(page or 1))
url = _URL_BROWSE.format(lang=_LANG, type="movies", order="new", page=page)
data = self._get_json(url)
if not isinstance(data, dict):
return []
return [
t for movie in (data.get("movies") or [])
if isinstance(movie, dict) and (t := self._cache_entry(movie))
]
def popular_series(self) -> List[str]:
return self._browse("tvseries", "views")
def genres(self) -> List[str]:
return sorted(GENRE_SLUGS.keys())
def titles_for_genre(self, genre: str) -> List[str]:
slug = GENRE_SLUGS.get(genre, "")
if not slug:
return []
url = _URL_GENRE.format(lang=_LANG, genre=quote_plus(slug))
data = self._get_json(url)
if not isinstance(data, dict):
return []
return [
t for movie in (data.get("movies") or [])
if isinstance(movie, dict) and (t := self._cache_entry(movie))
]
def capabilities(self) -> set[str]:
return {"popular_series", "new_titles", "genres"}