Files
ViewIT/addon/plugins/netzkino_plugin.py

255 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""NetzkKino Plugin für ViewIT.
Nutzt die öffentliche JSON-API von Netzkino.
Nur Filme, keine Serien. Direkte MP4-Streams kein ResolveURL nötig.
Legal und kostenlos.
"""
from __future__ import annotations
from typing import Any, Callable, List, Optional
try: # pragma: no cover
import requests
except ImportError as exc: # pragma: no cover
requests = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
DEFAULT_TIMEOUT = 20
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
}
_API_BASE = "https://api.netzkino.de.simplecache.net/capi-2.0a"
_STREAM_BASE = "https://pmd.netzkino-seite.netzkino.de"
_URL_SEARCH = _API_BASE + "/search?q={query}&d=www&l=de-DE"
_URL_CATEGORY = _API_BASE + "/categories/{slug}.json?d=www&l=de-DE"
# Slug → Anzeigename
CATEGORIES: dict[str, str] = {
"highlights": "Highlights",
"neue-filme": "Neue Filme",
"alle-filme": "Alle Filme",
"action": "Action",
"animation": "Animation",
"dokumentarfilm": "Dokumentation",
"drama": "Drama",
"fantasy": "Fantasy",
"horror": "Horror",
"komodie": "Komödie",
"krimi-thriller": "Krimi & Thriller",
"romantik": "Romantik",
"sci-fi": "Science-Fiction",
}
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
# ---------------------------------------------------------------------------
# Plugin-Klasse
# ---------------------------------------------------------------------------
class NetzkinoPlugin(BasisPlugin):
"""NetzkKino Integration für ViewIT.
Alle Titel sind Filme (keine Serien). Streams sind direkte MP4-URLs.
"""
name = "NetzkKino"
def __init__(self) -> None:
# title → direkte MP4-URL
self._title_to_stream: dict[str, str] = {}
# title → (plot, poster, fanart)
self._title_meta: dict[str, tuple[str, str, str]] = {}
# ------------------------------------------------------------------
# Verfügbarkeit
# ------------------------------------------------------------------
@property
def is_available(self) -> bool:
return REQUESTS_AVAILABLE
@property
def unavailable_reason(self) -> str:
if REQUESTS_AVAILABLE:
return ""
return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}"
# ------------------------------------------------------------------
# HTTP
# ------------------------------------------------------------------
def _get_session(self): # type: ignore[return]
from http_session_pool import get_requests_session
return get_requests_session("netzkino", headers=HEADERS)
def _get_json(self, url: str) -> dict | list | None:
session = self._get_session()
response = None
try:
response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return response.json()
except Exception:
return None
finally:
if response is not None:
try:
response.close()
except Exception:
pass
# ------------------------------------------------------------------
# Interne Hilfsmethoden
# ------------------------------------------------------------------
def _build_stream_url(self, streaming_id: str) -> str:
return f"{_STREAM_BASE}/{streaming_id}.mp4"
def _cache_post(self, post: dict) -> str:
"""Cached einen API-Post und gibt den Titel zurück ('' = überspringen)."""
title = str(post.get("title") or "").strip()
if not title:
return ""
# Stream-URL aus custom_fields.Streaming[0]
custom = post.get("custom_fields") or {}
streaming_ids = custom.get("Streaming") or []
if not streaming_ids or not streaming_ids[0]:
return ""
stream_url = self._build_stream_url(str(streaming_ids[0]))
self._title_to_stream[title] = stream_url
# Metadaten
plot = str(post.get("content") or "").strip()
# Poster: thumbnail
poster = str(post.get("thumbnail") or "").strip()
# Fanart: featured_img_all[0]
fanart_list = custom.get("featured_img_all") or []
fanart = str(fanart_list[0]).strip() if fanart_list and fanart_list[0] else ""
self._title_meta[title] = (plot, poster, fanart)
return title
def _load_posts(self, url: str) -> List[str]:
data = self._get_json(url)
if not isinstance(data, dict):
return []
titles: list[str] = []
for post in (data.get("posts") or []):
if not isinstance(post, dict):
continue
t = self._cache_post(post)
if t:
titles.append(t)
return titles
# ------------------------------------------------------------------
# Pflicht-Methoden
# ------------------------------------------------------------------
async def search_titles(
self, query: str, progress_callback: ProgressCallback = None
) -> List[str]:
query = (query or "").strip()
if not query or not REQUESTS_AVAILABLE:
return []
from urllib.parse import quote_plus
url = _URL_SEARCH.format(query=quote_plus(query))
return self._load_posts(url)
def seasons_for(self, title: str) -> List[str]:
# NetzkKino hat ausschließlich Filme
return ["Film"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
# Nur eine Episode: der Film selbst
return [title]
# ------------------------------------------------------------------
# Stream
# ------------------------------------------------------------------
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
return self._title_to_stream.get(title)
def resolve_stream_link(self, link: str) -> Optional[str]:
# Direkte MP4-URL keine Auflösung nötig
link = (link or "").strip()
return link if link else None
# ------------------------------------------------------------------
# Metadaten
# ------------------------------------------------------------------
def metadata_for(
self, title: str
) -> tuple[dict[str, str], dict[str, str], list | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
info: dict[str, str] = {"title": title}
art: dict[str, str] = {}
cached = self._title_meta.get(title)
if cached:
plot, poster, fanart = cached
if plot:
info["plot"] = plot
if poster:
art["thumb"] = poster
art["poster"] = poster
if fanart:
art["fanart"] = fanart
art["landscape"] = fanart
return info, art, None
# ------------------------------------------------------------------
# Browsing
# ------------------------------------------------------------------
def new_titles_page(self, page: int = 1) -> List[str]:
url = _URL_CATEGORY.format(slug="neue-filme")
return self._load_posts(url)
def new_titles(self) -> List[str]:
return self.new_titles_page(1)
def genres(self) -> List[str]:
# Gibt die Anzeigenamen zurück (sortiert, Browsing-Kategorien)
return sorted(CATEGORIES.values())
def titles_for_genre(self, genre: str) -> List[str]:
# Slug aus Anzeigename rückauflösen
slug = next((s for s, n in CATEGORIES.items() if n == genre), "")
if not slug:
return []
url = _URL_CATEGORY.format(slug=slug)
return self._load_posts(url)
def capabilities(self) -> set[str]:
return {"new_titles", "genres"}