239 lines
7.3 KiB
Python
239 lines
7.3 KiB
Python
"""YouTube Plugin fuer ViewIT.
|
|
|
|
Suche und Wiedergabe von YouTube-Videos via HTML-Scraping und yt-dlp.
|
|
Benoetigt script.module.yt-dlp (optional).
|
|
|
|
Video-Eintraege werden als "Titel||VIDEO_ID" kodiert.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
requests = None # type: ignore
|
|
|
|
from plugin_interface import BasisPlugin
|
|
|
|
try:
|
|
import xbmc # type: ignore
|
|
def _log(msg: str) -> None:
|
|
xbmc.log(f"[ViewIt][YouTube] {msg}", xbmc.LOGWARNING)
|
|
except ImportError:
|
|
def _log(msg: str) -> None:
|
|
pass
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Konstanten
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DEFAULT_TIMEOUT = 20
|
|
_SEP = "||" # Trennzeichen zwischen Titel und Video-ID
|
|
|
|
BASE_URL = "https://www.youtube.com"
|
|
|
|
HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
),
|
|
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
}
|
|
|
|
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hilfsfunktionen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _encode(title: str, video_id: str) -> str:
|
|
return f"{title}{_SEP}{video_id}"
|
|
|
|
|
|
def _decode_id(entry: str) -> Optional[str]:
|
|
"""Extrahiert Video-ID aus einem kodierten Eintrag."""
|
|
if _SEP in entry:
|
|
return entry.split(_SEP, 1)[1].strip()
|
|
# Fallback: 11-Zeichen YouTube-ID am Ende
|
|
m = re.search(r"([A-Za-z0-9_-]{11})$", entry)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def _decode_title(entry: str) -> str:
|
|
if _SEP in entry:
|
|
return entry.split(_SEP, 1)[0].strip()
|
|
return entry
|
|
|
|
|
|
def _get_session() -> Any:
|
|
try:
|
|
from http_session_pool import get_requests_session
|
|
return get_requests_session("youtube", headers=HEADERS)
|
|
except Exception:
|
|
if requests:
|
|
s = requests.Session()
|
|
s.headers.update(HEADERS)
|
|
return s
|
|
return None
|
|
|
|
|
|
def _extract_yt_initial_data(html: str) -> Optional[dict]:
|
|
"""Extrahiert ytInitialData JSON aus dem HTML-Source."""
|
|
m = re.search(r"var ytInitialData\s*=\s*(\{.*?\});\s*(?:var |</script>)", html, re.DOTALL)
|
|
if not m:
|
|
# Alternativer Pattern
|
|
m = re.search(r"ytInitialData\s*=\s*(\{.+?\})\s*;", html, re.DOTALL)
|
|
if not m:
|
|
return None
|
|
try:
|
|
return json.loads(m.group(1))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _videos_from_search_data(data: dict) -> List[str]:
|
|
"""Extrahiert Video-Eintraege aus ytInitialData (Suchergebnisse)."""
|
|
results: List[str] = []
|
|
try:
|
|
contents = (
|
|
data
|
|
.get("contents", {})
|
|
.get("twoColumnSearchResultsRenderer", {})
|
|
.get("primaryContents", {})
|
|
.get("sectionListRenderer", {})
|
|
.get("contents", [])
|
|
)
|
|
for section in contents:
|
|
items = (
|
|
section
|
|
.get("itemSectionRenderer", {})
|
|
.get("contents", [])
|
|
)
|
|
for item in items:
|
|
vr = item.get("videoRenderer") or item.get("compactVideoRenderer")
|
|
if not vr:
|
|
continue
|
|
video_id = vr.get("videoId", "").strip()
|
|
if not video_id:
|
|
continue
|
|
title_runs = vr.get("title", {}).get("runs", [])
|
|
title = "".join(r.get("text", "") for r in title_runs).strip()
|
|
if not title:
|
|
title = vr.get("title", {}).get("simpleText", "").strip()
|
|
if title and video_id:
|
|
results.append(_encode(title, video_id))
|
|
except Exception as exc:
|
|
_log(f"[YouTube] _videos_from_search_data Fehler: {exc}")
|
|
return results
|
|
|
|
|
|
|
|
def _search_with_ytdlp(query: str, count: int = 20) -> List[str]:
|
|
"""Sucht YouTube-Videos via yt-dlp ytsearch-Extraktor."""
|
|
if not ensure_ytdlp_in_path():
|
|
return []
|
|
try:
|
|
from yt_dlp import YoutubeDL # type: ignore
|
|
except ImportError:
|
|
return []
|
|
ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
|
|
try:
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(f"ytsearch{count}:{query}", download=False)
|
|
if not info:
|
|
return []
|
|
return [
|
|
_encode(e["title"], e["id"])
|
|
for e in (info.get("entries") or [])
|
|
if e.get("id") and e.get("title")
|
|
]
|
|
except Exception as exc:
|
|
_log(f"[YouTube] yt-dlp Suche Fehler: {exc}")
|
|
return []
|
|
|
|
|
|
def _fetch_search_videos(url: str) -> List[str]:
|
|
"""Holt Videos von einer YouTube-Suche via ytInitialData."""
|
|
session = _get_session()
|
|
if session is None:
|
|
return []
|
|
try:
|
|
resp = session.get(url, timeout=DEFAULT_TIMEOUT)
|
|
resp.raise_for_status()
|
|
data = _extract_yt_initial_data(resp.text)
|
|
if not data:
|
|
return []
|
|
return _videos_from_search_data(data)
|
|
except Exception as exc:
|
|
_log(f"[YouTube] _fetch_search_videos ({url}): {exc}")
|
|
return []
|
|
|
|
|
|
from ytdlp_helper import ensure_ytdlp_in_path, resolve_youtube_url
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class YoutubePlugin(BasisPlugin):
|
|
name = "YouTube"
|
|
|
|
_SEASONS = ["Stream"]
|
|
|
|
def capabilities(self) -> Set[str]:
|
|
return set()
|
|
|
|
async def search_titles(
|
|
self,
|
|
query: str,
|
|
progress_callback: ProgressCallback = None,
|
|
) -> List[str]:
|
|
if not query.strip():
|
|
return []
|
|
# Primär: yt-dlp (robust, kein HTML-Scraping)
|
|
results = _search_with_ytdlp(query)
|
|
if results:
|
|
return results
|
|
# Fallback: HTML-Scraping
|
|
if requests is None:
|
|
return []
|
|
url = f"{BASE_URL}/results?search_query={requests.utils.quote(query)}" # type: ignore
|
|
return _fetch_search_videos(url)
|
|
|
|
def seasons_for(self, title: str) -> List[str]:
|
|
return list(self._SEASONS)
|
|
|
|
def episodes_for(self, title: str, season: str) -> List[str]:
|
|
if season == "Stream":
|
|
return [title]
|
|
return []
|
|
|
|
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
|
|
video_id = _decode_id(episode) or _decode_id(title)
|
|
if not video_id:
|
|
return None
|
|
return resolve_youtube_url(video_id)
|
|
|
|
def resolve_stream_link(self, link: str) -> Optional[str]:
|
|
return link # bereits direkte URL
|
|
|
|
def metadata_for(self, title: str):
|
|
"""Thumbnail aus Video-ID ableiten."""
|
|
video_id = _decode_id(title)
|
|
clean_title = _decode_title(title)
|
|
info: Dict[str, str] = {"title": clean_title}
|
|
art: Dict[str, str] = {}
|
|
if video_id:
|
|
art["thumb"] = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
|
|
art["poster"] = f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg"
|
|
return info, art, None
|
|
|
|
|
|
Plugin = YoutubePlugin
|