327 lines
10 KiB
Python
327 lines
10 KiB
Python
"""YouTube Plugin fuer ViewIT.
|
|
|
|
Suche und Wiedergabe von YouTube-Videos via HTML-Scraping und yt-dlp.
|
|
Benoetigt script.module.yt-dlp (optional).
|
|
|
|
Video-Eintraege werden als "Titel||VIDEO_ID" kodiert.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
requests = None # type: ignore
|
|
|
|
from plugin_interface import BasisPlugin
|
|
|
|
try:
|
|
import xbmc # type: ignore
|
|
def _log(msg: str) -> None:
|
|
xbmc.log(f"[ViewIt][YouTube] {msg}", xbmc.LOGWARNING)
|
|
except ImportError:
|
|
def _log(msg: str) -> None:
|
|
pass
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Konstanten
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DEFAULT_TIMEOUT = 20
|
|
_SEP = "||" # Trennzeichen zwischen Titel und Video-ID
|
|
|
|
BASE_URL = "https://www.youtube.com"
|
|
|
|
HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
),
|
|
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
}
|
|
|
|
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hilfsfunktionen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _encode(title: str, video_id: str) -> str:
|
|
return f"{title}{_SEP}{video_id}"
|
|
|
|
|
|
def _decode_id(entry: str) -> Optional[str]:
|
|
"""Extrahiert Video-ID aus einem kodierten Eintrag."""
|
|
if _SEP in entry:
|
|
return entry.split(_SEP, 1)[1].strip()
|
|
# Fallback: 11-Zeichen YouTube-ID am Ende
|
|
m = re.search(r"([A-Za-z0-9_-]{11})$", entry)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def _decode_title(entry: str) -> str:
|
|
if _SEP in entry:
|
|
return entry.split(_SEP, 1)[0].strip()
|
|
return entry
|
|
|
|
|
|
def _get_session() -> Any:
|
|
try:
|
|
from http_session_pool import get_requests_session
|
|
return get_requests_session("youtube", headers=HEADERS)
|
|
except Exception:
|
|
if requests:
|
|
s = requests.Session()
|
|
s.headers.update(HEADERS)
|
|
return s
|
|
return None
|
|
|
|
|
|
def _extract_yt_initial_data(html: str) -> Optional[dict]:
|
|
"""Extrahiert ytInitialData JSON aus dem HTML-Source."""
|
|
m = re.search(r"var ytInitialData\s*=\s*(\{.*?\});\s*(?:var |</script>)", html, re.DOTALL)
|
|
if not m:
|
|
# Alternativer Pattern
|
|
m = re.search(r"ytInitialData\s*=\s*(\{.+?\})\s*;", html, re.DOTALL)
|
|
if not m:
|
|
return None
|
|
try:
|
|
return json.loads(m.group(1))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _videos_from_search_data(data: dict) -> List[str]:
|
|
"""Extrahiert Video-Eintraege aus ytInitialData (Suchergebnisse)."""
|
|
results: List[str] = []
|
|
try:
|
|
contents = (
|
|
data
|
|
.get("contents", {})
|
|
.get("twoColumnSearchResultsRenderer", {})
|
|
.get("primaryContents", {})
|
|
.get("sectionListRenderer", {})
|
|
.get("contents", [])
|
|
)
|
|
for section in contents:
|
|
items = (
|
|
section
|
|
.get("itemSectionRenderer", {})
|
|
.get("contents", [])
|
|
)
|
|
for item in items:
|
|
vr = item.get("videoRenderer") or item.get("compactVideoRenderer")
|
|
if not vr:
|
|
continue
|
|
video_id = vr.get("videoId", "").strip()
|
|
if not video_id:
|
|
continue
|
|
title_runs = vr.get("title", {}).get("runs", [])
|
|
title = "".join(r.get("text", "") for r in title_runs).strip()
|
|
if not title:
|
|
title = vr.get("title", {}).get("simpleText", "").strip()
|
|
if title and video_id:
|
|
results.append(_encode(title, video_id))
|
|
except Exception as exc:
|
|
_log(f"[YouTube] _videos_from_search_data Fehler: {exc}")
|
|
return results
|
|
|
|
|
|
|
|
def _search_with_ytdlp(query: str, count: int = 20) -> List[str]:
|
|
"""Sucht YouTube-Videos via yt-dlp ytsearch-Extraktor."""
|
|
if not _ensure_ytdlp_in_path():
|
|
return []
|
|
try:
|
|
from yt_dlp import YoutubeDL # type: ignore
|
|
except ImportError:
|
|
return []
|
|
ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
|
|
try:
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(f"ytsearch{count}:{query}", download=False)
|
|
if not info:
|
|
return []
|
|
return [
|
|
_encode(e["title"], e["id"])
|
|
for e in (info.get("entries") or [])
|
|
if e.get("id") and e.get("title")
|
|
]
|
|
except Exception as exc:
|
|
_log(f"[YouTube] yt-dlp Suche Fehler: {exc}")
|
|
return []
|
|
|
|
|
|
def _fetch_search_videos(url: str) -> List[str]:
|
|
"""Holt Videos von einer YouTube-Suche via ytInitialData."""
|
|
session = _get_session()
|
|
if session is None:
|
|
return []
|
|
try:
|
|
resp = session.get(url, timeout=DEFAULT_TIMEOUT)
|
|
resp.raise_for_status()
|
|
data = _extract_yt_initial_data(resp.text)
|
|
if not data:
|
|
return []
|
|
return _videos_from_search_data(data)
|
|
except Exception as exc:
|
|
_log(f"[YouTube] _fetch_search_videos ({url}): {exc}")
|
|
return []
|
|
|
|
|
|
def _fix_strptime() -> None:
|
|
"""Kodi-Workaround: datetime.strptime ist manchmal None."""
|
|
import datetime as _dt
|
|
import time as _time
|
|
if not callable(getattr(_dt.datetime, "strptime", None)):
|
|
_dt.datetime.strptime = lambda s, f: _dt.datetime(*(_time.strptime(s, f)[0:6]))
|
|
|
|
|
|
def _ensure_ytdlp_in_path() -> bool:
|
|
"""Fuegt script.module.yt-dlp/lib zum sys.path hinzu falls noetig."""
|
|
_fix_strptime()
|
|
try:
|
|
import yt_dlp # type: ignore # noqa: F401
|
|
return True
|
|
except ImportError:
|
|
pass
|
|
try:
|
|
import sys, os
|
|
import xbmcvfs # type: ignore
|
|
lib_path = xbmcvfs.translatePath("special://home/addons/script.module.yt-dlp/lib")
|
|
if lib_path and os.path.isdir(lib_path) and lib_path not in sys.path:
|
|
sys.path.insert(0, lib_path)
|
|
import yt_dlp # type: ignore # noqa: F401
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _get_quality_format() -> str:
|
|
"""Liest YouTube-Qualitaet aus den Addon-Einstellungen."""
|
|
_QUALITY_MAP = {
|
|
"0": "best[ext=mp4]/best",
|
|
"1": "bestvideo[height<=1080][ext=mp4]+bestaudio[ext=m4a]/best[height<=1080][ext=mp4]/best",
|
|
"2": "bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best",
|
|
"3": "bestvideo[height<=480][ext=mp4]+bestaudio[ext=m4a]/best[height<=480][ext=mp4]/best",
|
|
"4": "bestvideo[height<=360][ext=mp4]+bestaudio[ext=m4a]/best[height<=360][ext=mp4]/best",
|
|
}
|
|
try:
|
|
import xbmcaddon # type: ignore
|
|
val = xbmcaddon.Addon().getSetting("youtube_quality") or "0"
|
|
return _QUALITY_MAP.get(val, _QUALITY_MAP["0"])
|
|
except Exception:
|
|
return _QUALITY_MAP["0"]
|
|
|
|
|
|
def _resolve_with_ytdlp(video_id: str) -> Optional[str]:
|
|
"""Loest Video-ID via yt-dlp zu direkter Stream-URL auf."""
|
|
if not _ensure_ytdlp_in_path():
|
|
_log("[YouTube] yt-dlp nicht verfuegbar (script.module.yt-dlp fehlt)")
|
|
try:
|
|
import xbmcgui
|
|
xbmcgui.Dialog().notification(
|
|
"yt-dlp fehlt",
|
|
"Bitte yt-dlp in den ViewIT-Einstellungen installieren.",
|
|
xbmcgui.NOTIFICATION_ERROR,
|
|
5000,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
try:
|
|
from yt_dlp import YoutubeDL # type: ignore
|
|
except ImportError:
|
|
return None
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
fmt = _get_quality_format()
|
|
ydl_opts: Dict[str, Any] = {
|
|
"format": fmt,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"extract_flat": False,
|
|
}
|
|
try:
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
if not info:
|
|
return None
|
|
# Einzelnes Video
|
|
direct = info.get("url")
|
|
if direct:
|
|
return direct
|
|
# Formatauswahl
|
|
formats = info.get("formats", [])
|
|
if formats:
|
|
return formats[-1].get("url")
|
|
except Exception as exc:
|
|
_log(f"[YouTube] yt-dlp Fehler fuer {video_id}: {exc}")
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class YoutubePlugin(BasisPlugin):
|
|
name = "YouTube"
|
|
|
|
_SEASONS = ["Stream"]
|
|
|
|
def capabilities(self) -> Set[str]:
|
|
return set()
|
|
|
|
async def search_titles(
|
|
self,
|
|
query: str,
|
|
progress_callback: ProgressCallback = None,
|
|
) -> List[str]:
|
|
if not query.strip():
|
|
return []
|
|
# Primär: yt-dlp (robust, kein HTML-Scraping)
|
|
results = _search_with_ytdlp(query)
|
|
if results:
|
|
return results
|
|
# Fallback: HTML-Scraping
|
|
if requests is None:
|
|
return []
|
|
url = f"{BASE_URL}/results?search_query={requests.utils.quote(query)}" # type: ignore
|
|
return _fetch_search_videos(url)
|
|
|
|
def seasons_for(self, title: str) -> List[str]:
|
|
return list(self._SEASONS)
|
|
|
|
def episodes_for(self, title: str, season: str) -> List[str]:
|
|
if season == "Stream":
|
|
return [title]
|
|
return []
|
|
|
|
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
|
|
video_id = _decode_id(episode) or _decode_id(title)
|
|
if not video_id:
|
|
return None
|
|
return _resolve_with_ytdlp(video_id)
|
|
|
|
def resolve_stream_link(self, link: str) -> Optional[str]:
|
|
return link # bereits direkte URL
|
|
|
|
def metadata_for(self, title: str):
|
|
"""Thumbnail aus Video-ID ableiten."""
|
|
video_id = _decode_id(title)
|
|
clean_title = _decode_title(title)
|
|
info: Dict[str, str] = {"title": clean_title}
|
|
art: Dict[str, str] = {}
|
|
if video_id:
|
|
art["thumb"] = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
|
|
art["poster"] = f"https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg"
|
|
return info, art, None
|
|
|
|
|
|
Plugin = YoutubePlugin
|