dev: YouTube-Plugin: yt-dlp Suche, Bug-Fix Any-Import
This commit is contained in:
271
addon/plugins/youtube_plugin.py
Normal file
271
addon/plugins/youtube_plugin.py
Normal file
@@ -0,0 +1,271 @@
|
||||
"""YouTube Plugin fuer ViewIT.
|
||||
|
||||
Suche und Wiedergabe von YouTube-Videos via HTML-Scraping und yt-dlp.
|
||||
Benoetigt script.module.yt-dlp (optional).
|
||||
|
||||
Video-Eintraege werden als "Titel||VIDEO_ID" kodiert.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from typing import Any, Callable, Dict, List, Optional, Set
|
||||
|
||||
try:
|
||||
import requests
|
||||
except ImportError:
|
||||
requests = None # type: ignore
|
||||
|
||||
from plugin_interface import BasisPlugin
|
||||
from plugin_helpers import log_error
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Konstanten
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DEFAULT_TIMEOUT = 20
|
||||
_SEP = "||" # Trennzeichen zwischen Titel und Video-ID
|
||||
|
||||
BASE_URL = "https://www.youtube.com"
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||||
),
|
||||
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
}
|
||||
|
||||
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hilfsfunktionen
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _encode(title: str, video_id: str) -> str:
|
||||
return f"{title}{_SEP}{video_id}"
|
||||
|
||||
|
||||
def _decode_id(entry: str) -> Optional[str]:
|
||||
"""Extrahiert Video-ID aus einem kodierten Eintrag."""
|
||||
if _SEP in entry:
|
||||
return entry.split(_SEP, 1)[1].strip()
|
||||
# Fallback: 11-Zeichen YouTube-ID am Ende
|
||||
m = re.search(r"([A-Za-z0-9_-]{11})$", entry)
|
||||
return m.group(1) if m else None
|
||||
|
||||
|
||||
def _decode_title(entry: str) -> str:
|
||||
if _SEP in entry:
|
||||
return entry.split(_SEP, 1)[0].strip()
|
||||
return entry
|
||||
|
||||
|
||||
def _get_session() -> Any:
|
||||
try:
|
||||
from http_session_pool import get_requests_session
|
||||
return get_requests_session("youtube", headers=HEADERS)
|
||||
except Exception:
|
||||
if requests:
|
||||
s = requests.Session()
|
||||
s.headers.update(HEADERS)
|
||||
return s
|
||||
return None
|
||||
|
||||
|
||||
def _extract_yt_initial_data(html: str) -> Optional[dict]:
|
||||
"""Extrahiert ytInitialData JSON aus dem HTML-Source."""
|
||||
m = re.search(r"var ytInitialData\s*=\s*(\{.*?\});\s*(?:var |</script>)", html, re.DOTALL)
|
||||
if not m:
|
||||
# Alternativer Pattern
|
||||
m = re.search(r"ytInitialData\s*=\s*(\{.+?\})\s*;", html, re.DOTALL)
|
||||
if not m:
|
||||
return None
|
||||
try:
|
||||
return json.loads(m.group(1))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _videos_from_search_data(data: dict) -> List[str]:
|
||||
"""Extrahiert Video-Eintraege aus ytInitialData (Suchergebnisse)."""
|
||||
results: List[str] = []
|
||||
try:
|
||||
contents = (
|
||||
data
|
||||
.get("contents", {})
|
||||
.get("twoColumnSearchResultsRenderer", {})
|
||||
.get("primaryContents", {})
|
||||
.get("sectionListRenderer", {})
|
||||
.get("contents", [])
|
||||
)
|
||||
for section in contents:
|
||||
items = (
|
||||
section
|
||||
.get("itemSectionRenderer", {})
|
||||
.get("contents", [])
|
||||
)
|
||||
for item in items:
|
||||
vr = item.get("videoRenderer") or item.get("compactVideoRenderer")
|
||||
if not vr:
|
||||
continue
|
||||
video_id = vr.get("videoId", "").strip()
|
||||
if not video_id:
|
||||
continue
|
||||
title_runs = vr.get("title", {}).get("runs", [])
|
||||
title = "".join(r.get("text", "") for r in title_runs).strip()
|
||||
if not title:
|
||||
title = vr.get("title", {}).get("simpleText", "").strip()
|
||||
if title and video_id:
|
||||
results.append(_encode(title, video_id))
|
||||
except Exception as exc:
|
||||
log_error(f"[YouTube] _videos_from_search_data Fehler: {exc}")
|
||||
return results
|
||||
|
||||
|
||||
|
||||
def _search_with_ytdlp(query: str, count: int = 20) -> List[str]:
|
||||
"""Sucht YouTube-Videos via yt-dlp ytsearch-Extraktor."""
|
||||
try:
|
||||
from yt_dlp import YoutubeDL # type: ignore
|
||||
except ImportError:
|
||||
return []
|
||||
ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
|
||||
try:
|
||||
with YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(f"ytsearch{count}:{query}", download=False)
|
||||
if not info:
|
||||
return []
|
||||
return [
|
||||
_encode(e["title"], e["id"])
|
||||
for e in (info.get("entries") or [])
|
||||
if e.get("id") and e.get("title")
|
||||
]
|
||||
except Exception as exc:
|
||||
log_error(f"[YouTube] yt-dlp Suche Fehler: {exc}")
|
||||
return []
|
||||
|
||||
|
||||
def _fetch_search_videos(url: str) -> List[str]:
|
||||
"""Holt Videos von einer YouTube-Suche via ytInitialData."""
|
||||
session = _get_session()
|
||||
if session is None:
|
||||
return []
|
||||
try:
|
||||
resp = session.get(url, timeout=DEFAULT_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
data = _extract_yt_initial_data(resp.text)
|
||||
if not data:
|
||||
return []
|
||||
return _videos_from_search_data(data)
|
||||
except Exception as exc:
|
||||
log_error(f"[YouTube] _fetch_search_videos ({url}): {exc}")
|
||||
return []
|
||||
|
||||
|
||||
def _resolve_with_ytdlp(video_id: str) -> Optional[str]:
|
||||
"""Loest Video-ID via yt-dlp zu direkter Stream-URL auf."""
|
||||
try:
|
||||
from yt_dlp import YoutubeDL # type: ignore
|
||||
except ImportError:
|
||||
log_error("[YouTube] yt-dlp nicht verfuegbar (script.module.yt-dlp fehlt)")
|
||||
try:
|
||||
import xbmc
|
||||
import xbmcgui
|
||||
yes = xbmcgui.Dialog().yesno(
|
||||
"yt-dlp fehlt",
|
||||
"script.module.yt-dlp ist nicht installiert.\nJetzt installieren?",
|
||||
)
|
||||
if yes:
|
||||
xbmc.executebuiltin("RunPlugin(plugin://plugin.video.viewit/?action=install_ytdlp)")
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
url = f"https://www.youtube.com/watch?v={video_id}"
|
||||
ydl_opts: Dict[str, Any] = {
|
||||
"format": "best[ext=mp4]/best",
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
"extract_flat": False,
|
||||
}
|
||||
try:
|
||||
with YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
if not info:
|
||||
return None
|
||||
# Einzelnes Video
|
||||
direct = info.get("url")
|
||||
if direct:
|
||||
return direct
|
||||
# Formatauswahl
|
||||
formats = info.get("formats", [])
|
||||
if formats:
|
||||
return formats[-1].get("url")
|
||||
except Exception as exc:
|
||||
log_error(f"[YouTube] yt-dlp Fehler fuer {video_id}: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Plugin
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class YoutubePlugin(BasisPlugin):
|
||||
name = "YouTube"
|
||||
|
||||
# Pseudo-Staffeln: nur Suche – Browse-Endpunkte erfordern Login
|
||||
_SEASONS = ["Suche"]
|
||||
|
||||
def capabilities(self) -> Set[str]:
|
||||
return set()
|
||||
|
||||
async def search_titles(
|
||||
self,
|
||||
query: str,
|
||||
progress_callback: ProgressCallback = None,
|
||||
) -> List[str]:
|
||||
if not query.strip():
|
||||
return []
|
||||
# Primär: yt-dlp (robust, kein HTML-Scraping)
|
||||
results = _search_with_ytdlp(query)
|
||||
if results:
|
||||
return results
|
||||
# Fallback: HTML-Scraping
|
||||
if requests is None:
|
||||
return []
|
||||
url = f"{BASE_URL}/results?search_query={requests.utils.quote(query)}" # type: ignore
|
||||
return _fetch_search_videos(url)
|
||||
|
||||
def seasons_for(self, title: str) -> List[str]:
|
||||
return list(self._SEASONS)
|
||||
|
||||
def episodes_for(self, title: str, season: str) -> List[str]:
|
||||
if season == "Suche":
|
||||
# Titel ist bereits ein kodierter Eintrag aus der Suche
|
||||
return [title]
|
||||
return []
|
||||
|
||||
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
|
||||
video_id = _decode_id(episode) or _decode_id(title)
|
||||
if not video_id:
|
||||
return None
|
||||
return _resolve_with_ytdlp(video_id)
|
||||
|
||||
def resolve_stream_link(self, link: str) -> Optional[str]:
|
||||
return link # bereits direkte URL
|
||||
|
||||
def metadata_for(self, title: str):
|
||||
"""Thumbnail aus Video-ID ableiten."""
|
||||
video_id = _decode_id(title)
|
||||
clean_title = _decode_title(title)
|
||||
info: Dict[str, str] = {"title": clean_title}
|
||||
art: Dict[str, str] = {}
|
||||
if video_id:
|
||||
art["thumb"] = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
|
||||
art["poster"] = f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg"
|
||||
return info, art, None
|
||||
|
||||
|
||||
Plugin = YoutubePlugin
|
||||
Reference in New Issue
Block a user