Files
ViewIT/addon/plugins/youtube_plugin.py

271 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""YouTube Plugin fuer ViewIT.
Suche und Wiedergabe von YouTube-Videos via HTML-Scraping und yt-dlp.
Benoetigt script.module.yt-dlp (optional).
Video-Eintraege werden als "Titel||VIDEO_ID" kodiert.
"""
from __future__ import annotations
import json
import re
from typing import Any, Callable, Dict, List, Optional, Set
try:
import requests
except ImportError:
requests = None # type: ignore
from plugin_interface import BasisPlugin
from plugin_helpers import log_error
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
DEFAULT_TIMEOUT = 20
_SEP = "||" # Trennzeichen zwischen Titel und Video-ID
BASE_URL = "https://www.youtube.com"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def _encode(title: str, video_id: str) -> str:
return f"{title}{_SEP}{video_id}"
def _decode_id(entry: str) -> Optional[str]:
"""Extrahiert Video-ID aus einem kodierten Eintrag."""
if _SEP in entry:
return entry.split(_SEP, 1)[1].strip()
# Fallback: 11-Zeichen YouTube-ID am Ende
m = re.search(r"([A-Za-z0-9_-]{11})$", entry)
return m.group(1) if m else None
def _decode_title(entry: str) -> str:
if _SEP in entry:
return entry.split(_SEP, 1)[0].strip()
return entry
def _get_session() -> Any:
try:
from http_session_pool import get_requests_session
return get_requests_session("youtube", headers=HEADERS)
except Exception:
if requests:
s = requests.Session()
s.headers.update(HEADERS)
return s
return None
def _extract_yt_initial_data(html: str) -> Optional[dict]:
"""Extrahiert ytInitialData JSON aus dem HTML-Source."""
m = re.search(r"var ytInitialData\s*=\s*(\{.*?\});\s*(?:var |</script>)", html, re.DOTALL)
if not m:
# Alternativer Pattern
m = re.search(r"ytInitialData\s*=\s*(\{.+?\})\s*;", html, re.DOTALL)
if not m:
return None
try:
return json.loads(m.group(1))
except Exception:
return None
def _videos_from_search_data(data: dict) -> List[str]:
"""Extrahiert Video-Eintraege aus ytInitialData (Suchergebnisse)."""
results: List[str] = []
try:
contents = (
data
.get("contents", {})
.get("twoColumnSearchResultsRenderer", {})
.get("primaryContents", {})
.get("sectionListRenderer", {})
.get("contents", [])
)
for section in contents:
items = (
section
.get("itemSectionRenderer", {})
.get("contents", [])
)
for item in items:
vr = item.get("videoRenderer") or item.get("compactVideoRenderer")
if not vr:
continue
video_id = vr.get("videoId", "").strip()
if not video_id:
continue
title_runs = vr.get("title", {}).get("runs", [])
title = "".join(r.get("text", "") for r in title_runs).strip()
if not title:
title = vr.get("title", {}).get("simpleText", "").strip()
if title and video_id:
results.append(_encode(title, video_id))
except Exception as exc:
log_error(f"[YouTube] _videos_from_search_data Fehler: {exc}")
return results
def _search_with_ytdlp(query: str, count: int = 20) -> List[str]:
"""Sucht YouTube-Videos via yt-dlp ytsearch-Extraktor."""
try:
from yt_dlp import YoutubeDL # type: ignore
except ImportError:
return []
ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
try:
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(f"ytsearch{count}:{query}", download=False)
if not info:
return []
return [
_encode(e["title"], e["id"])
for e in (info.get("entries") or [])
if e.get("id") and e.get("title")
]
except Exception as exc:
log_error(f"[YouTube] yt-dlp Suche Fehler: {exc}")
return []
def _fetch_search_videos(url: str) -> List[str]:
"""Holt Videos von einer YouTube-Suche via ytInitialData."""
session = _get_session()
if session is None:
return []
try:
resp = session.get(url, timeout=DEFAULT_TIMEOUT)
resp.raise_for_status()
data = _extract_yt_initial_data(resp.text)
if not data:
return []
return _videos_from_search_data(data)
except Exception as exc:
log_error(f"[YouTube] _fetch_search_videos ({url}): {exc}")
return []
def _resolve_with_ytdlp(video_id: str) -> Optional[str]:
"""Loest Video-ID via yt-dlp zu direkter Stream-URL auf."""
try:
from yt_dlp import YoutubeDL # type: ignore
except ImportError:
log_error("[YouTube] yt-dlp nicht verfuegbar (script.module.yt-dlp fehlt)")
try:
import xbmcgui
xbmcgui.Dialog().notification(
"yt-dlp fehlt",
"Bitte yt-dlp in den ViewIT-Einstellungen installieren.",
xbmcgui.NOTIFICATION_ERROR,
5000,
)
except Exception:
pass
return None
url = f"https://www.youtube.com/watch?v={video_id}"
ydl_opts: Dict[str, Any] = {
"format": "best[ext=mp4]/best",
"quiet": True,
"no_warnings": True,
"extract_flat": False,
}
try:
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if not info:
return None
# Einzelnes Video
direct = info.get("url")
if direct:
return direct
# Formatauswahl
formats = info.get("formats", [])
if formats:
return formats[-1].get("url")
except Exception as exc:
log_error(f"[YouTube] yt-dlp Fehler fuer {video_id}: {exc}")
return None
# ---------------------------------------------------------------------------
# Plugin
# ---------------------------------------------------------------------------
class YoutubePlugin(BasisPlugin):
name = "YouTube"
# Pseudo-Staffeln: nur Suche Browse-Endpunkte erfordern Login
_SEASONS = ["Suche"]
def capabilities(self) -> Set[str]:
return set()
async def search_titles(
self,
query: str,
progress_callback: ProgressCallback = None,
) -> List[str]:
if not query.strip():
return []
# Primär: yt-dlp (robust, kein HTML-Scraping)
results = _search_with_ytdlp(query)
if results:
return results
# Fallback: HTML-Scraping
if requests is None:
return []
url = f"{BASE_URL}/results?search_query={requests.utils.quote(query)}" # type: ignore
return _fetch_search_videos(url)
def seasons_for(self, title: str) -> List[str]:
return list(self._SEASONS)
def episodes_for(self, title: str, season: str) -> List[str]:
if season == "Suche":
# Titel ist bereits ein kodierter Eintrag aus der Suche
return [title]
return []
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
video_id = _decode_id(episode) or _decode_id(title)
if not video_id:
return None
return _resolve_with_ytdlp(video_id)
def resolve_stream_link(self, link: str) -> Optional[str]:
return link # bereits direkte URL
def metadata_for(self, title: str):
"""Thumbnail aus Video-ID ableiten."""
video_id = _decode_id(title)
clean_title = _decode_title(title)
info: Dict[str, str] = {"title": clean_title}
art: Dict[str, str] = {}
if video_id:
art["thumb"] = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
art["poster"] = f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg"
return info, art, None
Plugin = YoutubePlugin