"""KKiste Plugin für ViewIT. Nutzt die JSON-REST-API von kkiste.eu. Filme und Serien mit TMDB-Thumbnails – kein HTML-Scraping. Serien-Besonderheit: Auf KKiste ist jede Staffel ein eigener Eintrag (z.B. "Breaking Bad - Staffel 1"). Die Suche liefert alle passenden Staffel-Einträge direkt. """ from __future__ import annotations import re from typing import Any, Callable, List, Optional from urllib.parse import quote_plus try: # pragma: no cover import requests except ImportError as exc: # pragma: no cover requests = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None from plugin_interface import BasisPlugin # --------------------------------------------------------------------------- # Konstanten # --------------------------------------------------------------------------- DOMAIN = "kkiste.eu" BASE_URL = "https://" + DOMAIN DEFAULT_TIMEOUT = 20 HEADERS = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Referer": BASE_URL + "/", "Origin": BASE_URL, } # Sprache: 2=Deutsch, 3=Englisch, all=alle _LANG = "2" _THUMB_BASE = "https://image.tmdb.org/t/p/w300" _URL_BROWSE = BASE_URL + "/data/browse/?lang={lang}&type={type}&order_by={order}&page={page}" _URL_SEARCH = BASE_URL + "/data/browse/?lang={lang}&order_by=new&page=1&limit=0" _URL_GENRE = BASE_URL + "/data/browse/?lang={lang}&type=movies&order_by=Trending&genre={genre}&page=1" _URL_WATCH = BASE_URL + "/data/watch/?_id={id}" GENRE_SLUGS: dict[str, str] = { "Action": "Action", "Animation": "Animation", "Biographie": "Biographie", "Dokumentation": "Dokumentation", "Drama": "Drama", "Familie": "Familie", "Fantasy": "Fantasy", "Horror": "Horror", "Komödie": "Komödie", "Krimi": "Krimi", "Mystery": "Mystery", "Romantik": "Romantik", "Science-Fiction": "Sci-Fi", "Thriller": "Thriller", "Western": "Western", } ProgressCallback = Optional[Callable[[str, Optional[int]], Any]] # --------------------------------------------------------------------------- # Plugin-Klasse # --------------------------------------------------------------------------- class KKistePlugin(BasisPlugin): """KKiste Integration für ViewIT (kkiste.eu). Jede Staffel einer Serie ist auf KKiste ein eigenständiger API-Eintrag. """ name = "KKiste" def __init__(self) -> None: # title → watch-URL (/data/watch/?_id=X) self._title_to_watch_url: dict[str, str] = {} # title → (plot, poster, fanart) self._title_meta: dict[str, tuple[str, str, str]] = {} # title → True wenn "Staffel"/"Season" im Titel self._is_series: dict[str, bool] = {} # title → Staffelnummer (aus "Staffel N" extrahiert) self._season_nr: dict[str, int] = {} # bevorzugte Hoster für Hoster-Dialog self._preferred_hosters: list[str] = [] # ------------------------------------------------------------------ # Verfügbarkeit # ------------------------------------------------------------------ @property def is_available(self) -> bool: return REQUESTS_AVAILABLE @property def unavailable_reason(self) -> str: if REQUESTS_AVAILABLE: return "" return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}" # ------------------------------------------------------------------ # HTTP # ------------------------------------------------------------------ def _get_session(self): # type: ignore[return] from http_session_pool import get_requests_session return get_requests_session("kkiste", headers=HEADERS) def _get_json(self, url: str) -> dict | list | None: session = self._get_session() response = None try: response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() return response.json() except Exception: return None finally: if response is not None: try: response.close() except Exception: pass # ------------------------------------------------------------------ # Interne Hilfsmethoden # ------------------------------------------------------------------ def _cache_entry(self, movie: dict) -> str: """Cached einen API-Eintrag und gibt den Titel zurück ('' = überspringen).""" title = str(movie.get("title") or "").strip() if not title or "_id" not in movie: return "" movie_id = str(movie["_id"]) self._title_to_watch_url[title] = _URL_WATCH.format(id=movie_id) # Serie erkennen is_series = "Staffel" in title or "Season" in title self._is_series[title] = is_series if is_series: m = re.search(r"(?:Staffel|Season)\s*(\d+)", title, re.IGNORECASE) if m: self._season_nr[title] = int(m.group(1)) # Metadaten poster = "" for key in ("poster_path_season", "poster_path"): if movie.get(key): poster = _THUMB_BASE + str(movie[key]) break fanart = _THUMB_BASE + str(movie["backdrop_path"]) if movie.get("backdrop_path") else "" plot = str(movie.get("storyline") or movie.get("overview") or "") self._title_meta[title] = (plot, poster, fanart) return title def _ensure_watch_url(self, title: str) -> str: """Gibt die Watch-URL zurück – lädt bei leerem Cache alle Titel nach.""" url = self._title_to_watch_url.get(title, "") if url: return url # Fallback: alle Titel laden und exact-match suchen search_url = _URL_SEARCH.format(lang=_LANG) data = self._get_json(search_url) if isinstance(data, dict): q_lower = title.lower() for movie in (data.get("movies") or []): if isinstance(movie, dict): raw = str(movie.get("title") or "").strip() if raw.lower() == q_lower: self._cache_entry(movie) return self._title_to_watch_url.get(title, "") return "" def _browse(self, content_type: str, order: str = "Trending") -> List[str]: url = _URL_BROWSE.format(lang=_LANG, type=content_type, order=order, page=1) data = self._get_json(url) if not isinstance(data, dict): return [] return [ t for movie in (data.get("movies") or []) if isinstance(movie, dict) and (t := self._cache_entry(movie)) ] def _hosters_for(self, title: str, season: str, episode: str) -> dict[str, str]: """Gibt {Hoster-Name → URL} für Titel/Staffel/Episode zurück.""" watch_url = self._ensure_watch_url(title) if not watch_url: return {} data = self._get_json(watch_url) if not isinstance(data, dict): return {} streams = data.get("streams") or [] hosters: dict[str, str] = {} seen: set[str] = set() # Film vs Serie: relevante Streams filtern if season == "Film": target_streams = [s for s in streams if isinstance(s, dict)] else: m = re.search(r"\d+", episode or "") ep_nr = int(m.group()) if m else None if ep_nr is None: return {} target_streams = [ s for s in streams if isinstance(s, dict) and s.get("e") == ep_nr ] for stream in target_streams: src = str(stream.get("stream") or "").strip() if not src: continue # Hoster-Name aus der Stream-URL extrahieren (nicht aus "source" – das ist die Aggregator-Quelle) try: from urllib.parse import urlparse host = urlparse(src).hostname or "Hoster" # Domain-Prefix entfernen (www.) if host.startswith("www."): host = host[4:] except Exception: host = "Hoster" name = host base_name = name i = 2 while name in seen: name = f"{base_name} {i}" i += 1 seen.add(name) hosters[name] = src return hosters # ------------------------------------------------------------------ # Pflicht-Methoden # ------------------------------------------------------------------ async def search_titles( self, query: str, progress_callback: ProgressCallback = None ) -> List[str]: query = (query or "").strip() if not query or not REQUESTS_AVAILABLE: return [] # KKiste: limit=0 lädt alle Titel, client-seitige Filterung url = _URL_SEARCH.format(lang=_LANG) data = self._get_json(url) if not isinstance(data, dict): return [] q_lower = query.lower() titles: list[str] = [] for movie in (data.get("movies") or []): if not isinstance(movie, dict) or "_id" not in movie: continue raw_title = str(movie.get("title") or "").strip() if not raw_title or q_lower not in raw_title.lower(): continue t = self._cache_entry(movie) if t: titles.append(t) return titles def seasons_for(self, title: str) -> List[str]: title = (title or "").strip() if not title: return [] is_series = self._is_series.get(title) if is_series is None: # Cache leer (neue Instanz) – nachfüllen self._ensure_watch_url(title) is_series = self._is_series.get(title) if is_series: season_nr = self._season_nr.get(title, 1) return [f"Staffel {season_nr}"] return ["Film"] def episodes_for(self, title: str, season: str) -> List[str]: title = (title or "").strip() if not title: return [] if season == "Film": return [title] # Serie: Episodenliste aus /data/watch/ laden watch_url = self._ensure_watch_url(title) if not watch_url: return [] data = self._get_json(watch_url) if not isinstance(data, dict): return [] episode_nrs: set[int] = set() for stream in (data.get("streams") or []): if not isinstance(stream, dict): continue e = stream.get("e") if e is not None: try: episode_nrs.add(int(e)) except (ValueError, TypeError): pass if not episode_nrs: return [title] return [f"Episode {nr}" for nr in sorted(episode_nrs)] # ------------------------------------------------------------------ # Stream # ------------------------------------------------------------------ def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]: return list(self._hosters_for(title, season, episode).keys()) def set_preferred_hosters(self, hosters: List[str]) -> None: self._preferred_hosters = [h for h in hosters if h] def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: title = (title or "").strip() hosters = self._hosters_for(title, season, episode) if not hosters: return None # Bevorzugten Hoster nutzen falls gesetzt for preferred in self._preferred_hosters: key = preferred.casefold() for name, url in hosters.items(): if key in name.casefold() or key in url.casefold(): return url # Fallback: erster Hoster return next(iter(hosters.values())) def resolve_stream_link(self, link: str) -> Optional[str]: link = (link or "").strip() if not link: return None try: from plugin_helpers import resolve_via_resolveurl return resolve_via_resolveurl(link, fallback_to_link=False) except Exception: return None # ------------------------------------------------------------------ # Metadaten # ------------------------------------------------------------------ def metadata_for( self, title: str ) -> tuple[dict[str, str], dict[str, str], list | None]: title = (title or "").strip() if not title: return {}, {}, None info: dict[str, str] = {"title": title} art: dict[str, str] = {} cached = self._title_meta.get(title) if cached: plot, poster, fanart = cached if plot: info["plot"] = plot if poster: art["thumb"] = poster art["poster"] = poster if fanart: art["fanart"] = fanart art["landscape"] = fanart return info, art, None # ------------------------------------------------------------------ # Browsing # ------------------------------------------------------------------ def new_titles(self) -> List[str]: return self._browse("movies", "new") def new_titles_page(self, page: int = 1) -> List[str]: page = max(1, int(page or 1)) url = _URL_BROWSE.format(lang=_LANG, type="movies", order="new", page=page) data = self._get_json(url) if not isinstance(data, dict): return [] return [ t for movie in (data.get("movies") or []) if isinstance(movie, dict) and (t := self._cache_entry(movie)) ] def popular_series(self) -> List[str]: return self._browse("tvseries", "views") def genres(self) -> List[str]: return sorted(GENRE_SLUGS.keys()) def titles_for_genre(self, genre: str) -> List[str]: slug = GENRE_SLUGS.get(genre, "") if not slug: return [] url = _URL_GENRE.format(lang=_LANG, genre=quote_plus(slug)) data = self._get_json(url) if not isinstance(data, dict): return [] return [ t for movie in (data.get("movies") or []) if isinstance(movie, dict) and (t := self._cache_entry(movie)) ] def capabilities(self) -> set[str]: return {"popular_series", "new_titles", "genres"}