Files
ViewIT/addon/core/trakt.py

590 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Trakt.tv API-Integration fuer ViewIT.
Bietet OAuth-Device-Auth, Scrobbling, Watchlist, History und Calendar.
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import urlencode
try:
import requests
except ImportError:
requests = None
TRAKT_API_BASE = "https://api.trakt.tv"
TRAKT_API_VERSION = "2"
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class TraktToken:
access_token: str
refresh_token: str
expires_at: int # Unix-Timestamp
created_at: int
@dataclass(frozen=True)
class TraktDeviceCode:
device_code: str
user_code: str
verification_url: str
expires_in: int
interval: int
@dataclass(frozen=True)
class TraktMediaIds:
trakt: int = 0
tmdb: int = 0
imdb: str = ""
slug: str = ""
tvdb: int = 0
@dataclass(frozen=True)
class TraktItem:
title: str
year: int
media_type: str # "movie", "show" oder "episode"
ids: TraktMediaIds = field(default_factory=TraktMediaIds)
season: int = 0
episode: int = 0
watched_at: str = ""
poster: str = ""
episode_title: str = "" # Episodentitel (extended=full)
episode_overview: str = "" # Episoden-Inhaltsangabe (extended=full)
episode_thumb: str = "" # Screenshot-URL (extended=images)
show_poster: str = "" # Serien-Poster-URL (extended=images)
show_fanart: str = "" # Serien-Fanart-URL (extended=images)
@dataclass(frozen=True)
class TraktEpisodeMeta:
"""Metadaten einer einzelnen Episode (aus extended=full,images)."""
title: str
overview: str
runtime_minutes: int
thumb: str # Screenshot-URL (https://)
@dataclass(frozen=True)
class TraktCalendarItem:
"""Ein Eintrag aus dem Trakt-Kalender (anstehende Episode)."""
show_title: str
show_year: int
show_ids: TraktMediaIds
season: int
episode: int
episode_title: str
episode_overview: str # Episoden-Inhaltsangabe (extended=full)
episode_thumb: str # Screenshot-URL (https://)
show_poster: str # Poster-URL (https://)
show_fanart: str # Fanart-URL (https://)
first_aired: str # ISO-8601, z.B. "2026-03-02T02:00:00.000Z"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _trakt_image_url(raw: str) -> str:
"""Stellt https:// vor relative Trakt-Bild-URLs."""
if not raw:
return ""
raw = raw.strip()
if raw.startswith("http"):
return raw
return f"https://{raw}"
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class TraktClient:
"""Trakt API Client."""
def __init__(
self,
client_id: str,
client_secret: str,
*,
log: Callable[[str], None] | None = None,
) -> None:
self._client_id = client_id
self._client_secret = client_secret
self._log = log
def _headers(self, token: str = "") -> dict[str, str]:
h = {
"Content-Type": "application/json",
"trakt-api-version": TRAKT_API_VERSION,
"trakt-api-key": self._client_id,
}
if token:
h["Authorization"] = f"Bearer {token}"
return h
def _do_log(self, msg: str) -> None:
if callable(self._log):
self._log(f"[Trakt] {msg}")
def _post(self, path: str, body: dict, *, token: str = "", timeout: int = 15) -> tuple[int, dict | None]:
if requests is None:
return 0, None
url = f"{TRAKT_API_BASE}{path}"
self._do_log(f"POST {path}")
try:
resp = requests.post(url, json=body, headers=self._headers(token), timeout=timeout)
status = resp.status_code
try:
payload = resp.json()
except Exception:
payload = None
self._do_log(f"POST {path} -> {status}")
return status, payload
except Exception as exc:
self._do_log(f"POST {path} FEHLER: {exc}")
return 0, None
def _get(self, path: str, *, token: str = "", timeout: int = 15) -> tuple[int, Any]:
if requests is None:
return 0, None
url = f"{TRAKT_API_BASE}{path}"
self._do_log(f"GET {path}")
try:
resp = requests.get(url, headers=self._headers(token), timeout=timeout)
status = resp.status_code
try:
payload = resp.json()
except Exception:
payload = None
self._do_log(f"GET {path} -> {status}")
return status, payload
except Exception as exc:
self._do_log(f"GET {path} FEHLER: {exc}")
return 0, None
# -------------------------------------------------------------------
# OAuth Device Flow
# -------------------------------------------------------------------
def device_code_request(self) -> TraktDeviceCode | None:
"""POST /oauth/device/code generiert User-Code + Verification-URL."""
status, payload = self._post("/oauth/device/code", {"client_id": self._client_id})
if status != 200 or not isinstance(payload, dict):
return None
return TraktDeviceCode(
device_code=payload.get("device_code", ""),
user_code=payload.get("user_code", ""),
verification_url=payload.get("verification_url", "https://trakt.tv/activate"),
expires_in=int(payload.get("expires_in", 600)),
interval=int(payload.get("interval", 5)),
)
def poll_device_token(self, device_code: str, *, interval: int = 5, expires_in: int = 600) -> TraktToken | None:
"""Pollt POST /oauth/device/token bis autorisiert oder Timeout."""
body = {
"code": device_code,
"client_id": self._client_id,
"client_secret": self._client_secret,
}
start = time.time()
while time.time() - start < expires_in:
status, payload = self._post("/oauth/device/token", body)
if status == 200 and isinstance(payload, dict):
return TraktToken(
access_token=payload.get("access_token", ""),
refresh_token=payload.get("refresh_token", ""),
expires_at=int(payload.get("created_at", 0)) + int(payload.get("expires_in", 0)),
created_at=int(payload.get("created_at", 0)),
)
if status == 400:
# Pending weiter warten
time.sleep(interval)
continue
if status in (404, 410, 418):
# Ungueltig, abgelaufen oder abgelehnt
self._do_log(f"Device-Auth abgebrochen: status={status}")
return None
if status == 429:
time.sleep(interval + 1)
continue
time.sleep(interval)
return None
def refresh_token(self, refresh_tok: str) -> TraktToken | None:
"""POST /oauth/token Token erneuern."""
body = {
"refresh_token": refresh_tok,
"client_id": self._client_id,
"client_secret": self._client_secret,
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
"grant_type": "refresh_token",
}
status, payload = self._post("/oauth/token", body)
if status != 200 or not isinstance(payload, dict):
return None
return TraktToken(
access_token=payload.get("access_token", ""),
refresh_token=payload.get("refresh_token", ""),
expires_at=int(payload.get("created_at", 0)) + int(payload.get("expires_in", 0)),
created_at=int(payload.get("created_at", 0)),
)
# -------------------------------------------------------------------
# Scrobble
# -------------------------------------------------------------------
def _build_scrobble_body(
self,
*,
media_type: str,
title: str,
tmdb_id: int,
imdb_id: str = "",
season: int = 0,
episode: int = 0,
progress: float = 0.0,
) -> dict:
ids: dict[str, object] = {}
if tmdb_id:
ids["tmdb"] = tmdb_id
if imdb_id:
ids["imdb"] = imdb_id
body: dict[str, object] = {"progress": round(progress, 1)}
if media_type == "tv" and season > 0 and episode > 0:
body["show"] = {"title": title, "ids": ids}
body["episode"] = {"season": season, "number": episode}
else:
body["movie"] = {"title": title, "ids": ids}
return body
def scrobble_start(
self, token: str, *, media_type: str, title: str,
tmdb_id: int, imdb_id: str = "",
season: int = 0, episode: int = 0, progress: float = 0.0,
) -> bool:
"""POST /scrobble/start"""
body = self._build_scrobble_body(
media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id,
season=season, episode=episode, progress=progress,
)
status, _ = self._post("/scrobble/start", body, token=token)
return status in (200, 201)
def scrobble_pause(
self, token: str, *, media_type: str, title: str,
tmdb_id: int, imdb_id: str = "",
season: int = 0, episode: int = 0, progress: float = 50.0,
) -> bool:
"""POST /scrobble/pause"""
body = self._build_scrobble_body(
media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id,
season=season, episode=episode, progress=progress,
)
status, _ = self._post("/scrobble/pause", body, token=token)
return status in (200, 201)
def scrobble_stop(
self, token: str, *, media_type: str, title: str,
tmdb_id: int, imdb_id: str = "",
season: int = 0, episode: int = 0, progress: float = 100.0,
) -> bool:
"""POST /scrobble/stop"""
body = self._build_scrobble_body(
media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id,
season=season, episode=episode, progress=progress,
)
status, _ = self._post("/scrobble/stop", body, token=token)
return status in (200, 201)
# -------------------------------------------------------------------
# Watchlist
# -------------------------------------------------------------------
def get_watchlist(self, token: str, *, media_type: str = "") -> list[TraktItem]:
"""GET /users/me/watchlist[/movies|/shows]"""
path = "/users/me/watchlist"
if media_type in ("movies", "shows"):
path = f"{path}/{media_type}"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return []
return self._parse_list_items(payload)
def add_to_watchlist(
self, token: str, *, media_type: str, tmdb_id: int, imdb_id: str = "",
) -> bool:
"""POST /sync/watchlist"""
ids: dict[str, object] = {}
if tmdb_id:
ids["tmdb"] = tmdb_id
if imdb_id:
ids["imdb"] = imdb_id
key = "movies" if media_type == "movie" else "shows"
body = {key: [{"ids": ids}]}
status, _ = self._post("/sync/watchlist", body, token=token)
return status in (200, 201)
def remove_from_watchlist(
self, token: str, *, media_type: str, tmdb_id: int, imdb_id: str = "",
) -> bool:
"""POST /sync/watchlist/remove"""
ids: dict[str, object] = {}
if tmdb_id:
ids["tmdb"] = tmdb_id
if imdb_id:
ids["imdb"] = imdb_id
key = "movies" if media_type == "movie" else "shows"
body = {key: [{"ids": ids}]}
status, _ = self._post("/sync/watchlist/remove", body, token=token)
return status == 200
# -------------------------------------------------------------------
# History
# -------------------------------------------------------------------
def get_history(
self, token: str, *, media_type: str = "", page: int = 1, limit: int = 20,
) -> list[TraktItem]:
"""GET /users/me/history[/movies|/shows|/episodes]"""
path = "/users/me/history"
if media_type in ("movies", "shows", "episodes"):
path = f"{path}/{media_type}"
path = f"{path}?page={page}&limit={limit}&extended=full,images"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return []
return self._parse_history_items(payload)
def get_watched_shows(self, token: str) -> list[TraktItem]:
"""GET /users/me/watched/shows alle Serien mit zuletzt gesehener Episode."""
status, payload = self._get("/users/me/watched/shows", token=token)
if status != 200 or not isinstance(payload, list):
self._do_log(f"get_watched_shows: status={status}")
return []
result: list[TraktItem] = []
for entry in payload:
if not isinstance(entry, dict):
continue
show = entry.get("show") or {}
ids = self._parse_ids((show.get("ids") or {}))
title = str(show.get("title", "") or "")
year = int(show.get("year", 0) or 0)
seasons = entry.get("seasons") or []
last_season = 0
last_episode = 0
for s in seasons:
snum = int((s.get("number") or 0))
if snum == 0: # Specials überspringen
continue
for ep in (s.get("episodes") or []):
enum = int((ep.get("number") or 0))
if snum > last_season or (snum == last_season and enum > last_episode):
last_season = snum
last_episode = enum
if title:
result.append(TraktItem(
title=title, year=year, media_type="episode",
ids=ids, season=last_season, episode=last_episode,
))
self._do_log(f"get_watched_shows: {len(result)} Serien")
return result
# -------------------------------------------------------------------
# Calendar
# -------------------------------------------------------------------
def get_calendar(self, token: str, start_date: str = "", days: int = 7) -> list[TraktCalendarItem]:
"""GET /calendars/my/shows/{start_date}/{days}
start_date: YYYY-MM-DD (leer = heute).
Liefert anstehende Episoden der eigenen Watchlist-Serien.
"""
if not start_date:
from datetime import date
start_date = date.today().strftime("%Y-%m-%d")
path = f"/calendars/my/shows/{start_date}/{days}?extended=full,images"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return []
items: list[TraktCalendarItem] = []
for entry in payload:
if not isinstance(entry, dict):
continue
show = entry.get("show") or {}
ep = entry.get("episode") or {}
show_ids = self._parse_ids(show.get("ids") or {})
ep_images = ep.get("images") or {}
show_images = show.get("images") or {}
def _first(img_dict: dict, key: str) -> str:
imgs = img_dict.get(key) or []
return _trakt_image_url(imgs[0]) if imgs else ""
items.append(TraktCalendarItem(
show_title=str(show.get("title", "") or ""),
show_year=int(show.get("year", 0) or 0),
show_ids=show_ids,
season=int(ep.get("season", 0) or 0),
episode=int(ep.get("number", 0) or 0),
episode_title=str(ep.get("title", "") or ""),
episode_overview=str(ep.get("overview", "") or ""),
episode_thumb=_first(ep_images, "screenshot"),
show_poster=_first(show_images, "poster"),
show_fanart=_first(show_images, "fanart"),
first_aired=str(entry.get("first_aired", "") or ""),
))
return items
def search_show(self, query: str) -> str:
"""GET /search/show?query=... gibt slug des ersten Treffers zurück, sonst ''."""
from urllib.parse import urlencode
path = f"/search/show?{urlencode({'query': query, 'limit': 1})}"
status, payload = self._get(path)
if status != 200 or not isinstance(payload, list) or not payload:
return ""
show = (payload[0] or {}).get("show") or {}
ids = show.get("ids") or {}
return str(ids.get("slug") or ids.get("trakt") or "")
def lookup_tv_season(
self,
show_id_or_slug: "str | int",
season_number: int,
*,
token: str = "",
) -> "dict[int, TraktEpisodeMeta] | None":
"""GET /shows/{id}/seasons/{n}/episodes?extended=full,images
Gibt episode_number -> TraktEpisodeMeta zurück, oder None bei Fehler.
"""
path = f"/shows/{show_id_or_slug}/seasons/{season_number}/episodes?extended=full,images"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return None
result: "dict[int, TraktEpisodeMeta]" = {}
for entry in payload:
try:
ep_no = int(entry.get("number") or 0)
except Exception:
continue
if not ep_no:
continue
images = entry.get("images") or {}
screenshots = images.get("screenshot") or []
thumb = _trakt_image_url(screenshots[0]) if screenshots else ""
result[ep_no] = TraktEpisodeMeta(
title=str(entry.get("title") or "").strip(),
overview=str(entry.get("overview") or "").strip(),
runtime_minutes=int(entry.get("runtime") or 0),
thumb=thumb,
)
return result or None
def get_episode_translation(
self,
show_id_or_slug: "str | int",
season: int,
episode: int,
language: str = "de",
) -> "tuple[str, str]":
"""GET /shows/{id}/seasons/{s}/episodes/{e}/translations/{lang}
Gibt (title, overview) in der Zielsprache zurück, oder ('', '') bei Fehler.
"""
path = f"/shows/{show_id_or_slug}/seasons/{season}/episodes/{episode}/translations/{language}"
status, payload = self._get(path)
if status != 200 or not isinstance(payload, list) or not payload:
return "", ""
first = payload[0] if payload else {}
return str(first.get("title") or ""), str(first.get("overview") or "")
# -------------------------------------------------------------------
# Parser
# -------------------------------------------------------------------
@staticmethod
def _parse_ids(ids_dict: dict) -> TraktMediaIds:
return TraktMediaIds(
trakt=int(ids_dict.get("trakt", 0) or 0),
tmdb=int(ids_dict.get("tmdb", 0) or 0),
imdb=str(ids_dict.get("imdb", "") or ""),
slug=str(ids_dict.get("slug", "") or ""),
tvdb=int(ids_dict.get("tvdb", 0) or 0),
)
def _parse_list_items(self, items: list) -> list[TraktItem]:
result: list[TraktItem] = []
for entry in items:
if not isinstance(entry, dict):
continue
item_type = entry.get("type", "")
media = entry.get(item_type) or entry.get("movie") or entry.get("show") or {}
if not isinstance(media, dict):
continue
ids = self._parse_ids(media.get("ids") or {})
result.append(TraktItem(
title=str(media.get("title", "") or ""),
year=int(media.get("year", 0) or 0),
media_type=item_type,
ids=ids,
))
return result
def _parse_history_items(self, items: list) -> list[TraktItem]:
result: list[TraktItem] = []
for entry in items:
if not isinstance(entry, dict):
continue
item_type = entry.get("type", "")
watched_at = str(entry.get("watched_at", "") or "")
if item_type == "episode":
show = entry.get("show") or {}
ep = entry.get("episode") or {}
ids = self._parse_ids((show.get("ids") or {}))
ep_images = ep.get("images") or {}
show_images = show.get("images") or {}
def _first_img(img_dict: dict, key: str) -> str:
imgs = img_dict.get(key) or []
return _trakt_image_url(imgs[0]) if imgs else ""
result.append(TraktItem(
title=str(show.get("title", "") or ""),
year=int(show.get("year", 0) or 0),
media_type="episode",
ids=ids,
season=int(ep.get("season", 0) or 0),
episode=int(ep.get("number", 0) or 0),
watched_at=watched_at,
episode_title=str(ep.get("title", "") or ""),
episode_overview=str(ep.get("overview", "") or ""),
episode_thumb=_first_img(ep_images, "screenshot"),
show_poster=_first_img(show_images, "poster"),
show_fanart=_first_img(show_images, "fanart"),
))
else:
media = entry.get("movie") or entry.get("show") or {}
ids = self._parse_ids(media.get("ids") or {})
result.append(TraktItem(
title=str(media.get("title", "") or ""),
year=int(media.get("year", 0) or 0),
media_type=item_type,
ids=ids,
watched_at=watched_at,
))
return result