186 lines
6.7 KiB
Python
186 lines
6.7 KiB
Python
"""Gemeinsame yt-dlp Hilfsfunktionen fuer YouTube-Wiedergabe.
|
||
|
||
Wird von youtube_plugin und dokustreams_plugin genutzt.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from typing import Any, Dict, Optional
|
||
|
||
try:
|
||
import xbmc # type: ignore
|
||
def _log(msg: str) -> None:
|
||
xbmc.log(f"[ViewIt][yt-dlp] {msg}", xbmc.LOGWARNING)
|
||
except ImportError:
|
||
def _log(msg: str) -> None:
|
||
pass
|
||
|
||
|
||
_YT_ID_RE = re.compile(
|
||
r"(?:youtube(?:-nocookie)?\.com/(?:embed/|v/|watch\?.*?v=)|youtu\.be/)"
|
||
r"([A-Za-z0-9_-]{11})"
|
||
)
|
||
|
||
|
||
def extract_youtube_id(url: str) -> Optional[str]:
|
||
"""Extrahiert eine YouTube Video-ID aus verschiedenen URL-Formaten."""
|
||
if not url:
|
||
return None
|
||
m = _YT_ID_RE.search(url)
|
||
return m.group(1) if m else None
|
||
|
||
|
||
def _fix_strptime() -> None:
|
||
"""Kodi-Workaround: datetime.strptime Race Condition vermeiden.
|
||
|
||
Kodi's eingebetteter Python kann in Multi-Thread-Umgebungen dazu fuehren
|
||
dass der lazy _strptime-Import fehlschlaegt. Wir importieren das Modul
|
||
direkt, damit es beim yt-dlp Aufruf bereits geladen ist.
|
||
"""
|
||
try:
|
||
import _strptime # noqa: F401 – erzwingt den internen Import
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def ensure_ytdlp_in_path() -> bool:
|
||
"""Fuegt script.module.yt-dlp/lib zum sys.path hinzu falls noetig."""
|
||
_fix_strptime()
|
||
try:
|
||
import yt_dlp # type: ignore # noqa: F401
|
||
return True
|
||
except ImportError:
|
||
pass
|
||
try:
|
||
import sys, os
|
||
import xbmcvfs # type: ignore
|
||
lib_path = xbmcvfs.translatePath("special://home/addons/script.module.yt-dlp/lib")
|
||
if lib_path and os.path.isdir(lib_path) and lib_path not in sys.path:
|
||
sys.path.insert(0, lib_path)
|
||
import yt_dlp # type: ignore # noqa: F401
|
||
return True
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
|
||
def get_quality_format() -> str:
|
||
"""Liest YouTube-Qualitaet aus den Addon-Einstellungen."""
|
||
_QUALITY_MAP = {
|
||
"0": "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
|
||
"1": "bestvideo[height<=1080][ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[height<=1080][ext=mp4]+bestaudio[ext=m4a]/best[height<=1080][ext=mp4]/best",
|
||
"2": "bestvideo[height<=720][ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best",
|
||
"3": "bestvideo[height<=480][ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[height<=480][ext=mp4]+bestaudio[ext=m4a]/best[height<=480][ext=mp4]/best",
|
||
"4": "bestvideo[height<=360][ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/bestvideo[height<=360][ext=mp4]+bestaudio[ext=m4a]/best[height<=360][ext=mp4]/best",
|
||
}
|
||
try:
|
||
import xbmcaddon # type: ignore
|
||
val = xbmcaddon.Addon().getSetting("youtube_quality") or "0"
|
||
return _QUALITY_MAP.get(val, _QUALITY_MAP["0"])
|
||
except Exception:
|
||
return _QUALITY_MAP["0"]
|
||
|
||
|
||
_AUDIO_SEP = "||AUDIO||"
|
||
_META_SEP = "||META||"
|
||
|
||
|
||
def resolve_youtube_url(video_id: str) -> Optional[str]:
|
||
"""Loest eine YouTube Video-ID via yt-dlp zu einer direkten Stream-URL auf.
|
||
|
||
Bei getrennten Video+Audio-Streams wird der Rueckgabestring im Format
|
||
``video_url||AUDIO||audio_url||META||key=val,key=val,...`` kodiert.
|
||
Der Aufrufer kann mit ``split_video_audio()`` alle Teile trennen.
|
||
"""
|
||
if not ensure_ytdlp_in_path():
|
||
_log("yt-dlp nicht verfuegbar (script.module.yt-dlp fehlt)")
|
||
try:
|
||
import xbmcgui # type: ignore
|
||
xbmcgui.Dialog().notification(
|
||
"yt-dlp fehlt",
|
||
"Bitte yt-dlp in den ViewIT-Einstellungen installieren.",
|
||
xbmcgui.NOTIFICATION_ERROR,
|
||
5000,
|
||
)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
try:
|
||
from yt_dlp import YoutubeDL # type: ignore
|
||
except ImportError:
|
||
return None
|
||
url = f"https://www.youtube.com/watch?v={video_id}"
|
||
fmt = get_quality_format()
|
||
ydl_opts: Dict[str, Any] = {
|
||
"format": fmt,
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"extract_flat": False,
|
||
}
|
||
try:
|
||
with YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=False)
|
||
if not info:
|
||
return None
|
||
duration = int(info.get("duration") or 0)
|
||
# Einzelne URL (kombinierter Stream)
|
||
direct = info.get("url")
|
||
if direct:
|
||
return direct
|
||
# Getrennte Video+Audio-Streams (hoehere Qualitaet)
|
||
rf = info.get("requested_formats")
|
||
if rf and len(rf) >= 2:
|
||
vf, af = rf[0], rf[1]
|
||
video_url = vf.get("url")
|
||
audio_url = af.get("url")
|
||
if video_url and audio_url:
|
||
vcodec = vf.get("vcodec") or "avc1.640028"
|
||
acodec = af.get("acodec") or "mp4a.40.2"
|
||
w = int(vf.get("width") or 1920)
|
||
h = int(vf.get("height") or 1080)
|
||
fps = int(vf.get("fps") or 25)
|
||
vbr = int((vf.get("tbr") or 5000) * 1000)
|
||
abr = int((af.get("tbr") or 128) * 1000)
|
||
asr = int(af.get("asr") or 44100)
|
||
ach = int(af.get("audio_channels") or 2)
|
||
meta = (
|
||
f"vc={vcodec},ac={acodec},"
|
||
f"w={w},h={h},fps={fps},"
|
||
f"vbr={vbr},abr={abr},"
|
||
f"asr={asr},ach={ach},dur={duration}"
|
||
)
|
||
_log(f"Getrennte Streams: {h}p {vcodec} + {acodec}")
|
||
return f"{video_url}{_AUDIO_SEP}{audio_url}{_META_SEP}{meta}"
|
||
if video_url:
|
||
return video_url
|
||
# Fallback: letztes Format
|
||
formats = info.get("formats", [])
|
||
if formats:
|
||
return formats[-1].get("url")
|
||
except Exception as exc:
|
||
_log(f"yt-dlp Fehler fuer {video_id}: {exc}")
|
||
return None
|
||
|
||
|
||
def split_video_audio(url: str) -> tuple:
|
||
"""Trennt eine URL in (video_url, audio_url, meta_dict).
|
||
|
||
Falls kein Audio-Teil vorhanden: (url, None, {}).
|
||
meta_dict enthaelt Keys: vc, ac, w, h, fps, vbr, abr, asr, ach, dur
|
||
"""
|
||
if _AUDIO_SEP not in url:
|
||
return url, None, {}
|
||
parts = url.split(_AUDIO_SEP, 1)
|
||
video_url = parts[0]
|
||
rest = parts[1]
|
||
meta: Dict[str, str] = {}
|
||
audio_url = rest
|
||
if _META_SEP in rest:
|
||
audio_url, meta_str = rest.split(_META_SEP, 1)
|
||
for pair in meta_str.split(","):
|
||
if "=" in pair:
|
||
k, v = pair.split("=", 1)
|
||
meta[k] = v
|
||
return video_url, audio_url, meta
|