"""Trakt.tv API-Integration fuer ViewIT. Bietet OAuth-Device-Auth, Scrobbling, Watchlist, History und Calendar. """ from __future__ import annotations import json import time from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional from urllib.parse import urlencode try: import requests except ImportError: requests = None TRAKT_API_BASE = "https://api.trakt.tv" TRAKT_API_VERSION = "2" # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class TraktToken: access_token: str refresh_token: str expires_at: int # Unix-Timestamp created_at: int @dataclass(frozen=True) class TraktDeviceCode: device_code: str user_code: str verification_url: str expires_in: int interval: int @dataclass(frozen=True) class TraktMediaIds: trakt: int = 0 tmdb: int = 0 imdb: str = "" slug: str = "" tvdb: int = 0 @dataclass(frozen=True) class TraktItem: title: str year: int media_type: str # "movie" oder "show" ids: TraktMediaIds = field(default_factory=TraktMediaIds) season: int = 0 episode: int = 0 watched_at: str = "" poster: str = "" @dataclass(frozen=True) class TraktCalendarItem: """Ein Eintrag aus dem Trakt-Kalender (anstehende Episode).""" show_title: str show_year: int show_ids: TraktMediaIds season: int episode: int episode_title: str first_aired: str # ISO-8601, z.B. "2026-03-02T02:00:00.000Z" # --------------------------------------------------------------------------- # Client # --------------------------------------------------------------------------- class TraktClient: """Trakt API Client.""" def __init__( self, client_id: str, client_secret: str, *, log: Callable[[str], None] | None = None, ) -> None: self._client_id = client_id self._client_secret = client_secret self._log = log def _headers(self, token: str = "") -> dict[str, str]: h = { "Content-Type": "application/json", "trakt-api-version": TRAKT_API_VERSION, "trakt-api-key": self._client_id, } if token: h["Authorization"] = f"Bearer {token}" return h def _do_log(self, msg: str) -> None: if callable(self._log): self._log(f"[Trakt] {msg}") def _post(self, path: str, body: dict, *, token: str = "", timeout: int = 15) -> tuple[int, dict | None]: if requests is None: return 0, None url = f"{TRAKT_API_BASE}{path}" self._do_log(f"POST {path}") try: resp = requests.post(url, json=body, headers=self._headers(token), timeout=timeout) status = resp.status_code try: payload = resp.json() except Exception: payload = None self._do_log(f"POST {path} -> {status}") return status, payload except Exception as exc: self._do_log(f"POST {path} FEHLER: {exc}") return 0, None def _get(self, path: str, *, token: str = "", timeout: int = 15) -> tuple[int, Any]: if requests is None: return 0, None url = f"{TRAKT_API_BASE}{path}" self._do_log(f"GET {path}") try: resp = requests.get(url, headers=self._headers(token), timeout=timeout) status = resp.status_code try: payload = resp.json() except Exception: payload = None self._do_log(f"GET {path} -> {status}") return status, payload except Exception as exc: self._do_log(f"GET {path} FEHLER: {exc}") return 0, None # ------------------------------------------------------------------- # OAuth Device Flow # ------------------------------------------------------------------- def device_code_request(self) -> TraktDeviceCode | None: """POST /oauth/device/code – generiert User-Code + Verification-URL.""" status, payload = self._post("/oauth/device/code", {"client_id": self._client_id}) if status != 200 or not isinstance(payload, dict): return None return TraktDeviceCode( device_code=payload.get("device_code", ""), user_code=payload.get("user_code", ""), verification_url=payload.get("verification_url", "https://trakt.tv/activate"), expires_in=int(payload.get("expires_in", 600)), interval=int(payload.get("interval", 5)), ) def poll_device_token(self, device_code: str, *, interval: int = 5, expires_in: int = 600) -> TraktToken | None: """Pollt POST /oauth/device/token bis autorisiert oder Timeout.""" body = { "code": device_code, "client_id": self._client_id, "client_secret": self._client_secret, } start = time.time() while time.time() - start < expires_in: status, payload = self._post("/oauth/device/token", body) if status == 200 and isinstance(payload, dict): return TraktToken( access_token=payload.get("access_token", ""), refresh_token=payload.get("refresh_token", ""), expires_at=int(payload.get("created_at", 0)) + int(payload.get("expires_in", 0)), created_at=int(payload.get("created_at", 0)), ) if status == 400: # Pending – weiter warten time.sleep(interval) continue if status in (404, 410, 418): # Ungueltig, abgelaufen oder abgelehnt self._do_log(f"Device-Auth abgebrochen: status={status}") return None if status == 429: time.sleep(interval + 1) continue time.sleep(interval) return None def refresh_token(self, refresh_tok: str) -> TraktToken | None: """POST /oauth/token – Token erneuern.""" body = { "refresh_token": refresh_tok, "client_id": self._client_id, "client_secret": self._client_secret, "redirect_uri": "urn:ietf:wg:oauth:2.0:oob", "grant_type": "refresh_token", } status, payload = self._post("/oauth/token", body) if status != 200 or not isinstance(payload, dict): return None return TraktToken( access_token=payload.get("access_token", ""), refresh_token=payload.get("refresh_token", ""), expires_at=int(payload.get("created_at", 0)) + int(payload.get("expires_in", 0)), created_at=int(payload.get("created_at", 0)), ) # ------------------------------------------------------------------- # Scrobble # ------------------------------------------------------------------- def _build_scrobble_body( self, *, media_type: str, title: str, tmdb_id: int, imdb_id: str = "", season: int = 0, episode: int = 0, progress: float = 0.0, ) -> dict: ids: dict[str, object] = {} if tmdb_id: ids["tmdb"] = tmdb_id if imdb_id: ids["imdb"] = imdb_id body: dict[str, object] = {"progress": round(progress, 1)} if media_type == "tv" and season > 0 and episode > 0: body["show"] = {"title": title, "ids": ids} body["episode"] = {"season": season, "number": episode} else: body["movie"] = {"title": title, "ids": ids} return body def scrobble_start( self, token: str, *, media_type: str, title: str, tmdb_id: int, imdb_id: str = "", season: int = 0, episode: int = 0, progress: float = 0.0, ) -> bool: """POST /scrobble/start""" body = self._build_scrobble_body( media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id, season=season, episode=episode, progress=progress, ) status, _ = self._post("/scrobble/start", body, token=token) return status in (200, 201) def scrobble_pause( self, token: str, *, media_type: str, title: str, tmdb_id: int, imdb_id: str = "", season: int = 0, episode: int = 0, progress: float = 50.0, ) -> bool: """POST /scrobble/pause""" body = self._build_scrobble_body( media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id, season=season, episode=episode, progress=progress, ) status, _ = self._post("/scrobble/pause", body, token=token) return status in (200, 201) def scrobble_stop( self, token: str, *, media_type: str, title: str, tmdb_id: int, imdb_id: str = "", season: int = 0, episode: int = 0, progress: float = 100.0, ) -> bool: """POST /scrobble/stop""" body = self._build_scrobble_body( media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id, season=season, episode=episode, progress=progress, ) status, _ = self._post("/scrobble/stop", body, token=token) return status in (200, 201) # ------------------------------------------------------------------- # Watchlist # ------------------------------------------------------------------- def get_watchlist(self, token: str, *, media_type: str = "") -> list[TraktItem]: """GET /users/me/watchlist[/movies|/shows]""" path = "/users/me/watchlist" if media_type in ("movies", "shows"): path = f"{path}/{media_type}" status, payload = self._get(path, token=token) if status != 200 or not isinstance(payload, list): return [] return self._parse_list_items(payload) def add_to_watchlist( self, token: str, *, media_type: str, tmdb_id: int, imdb_id: str = "", ) -> bool: """POST /sync/watchlist""" ids: dict[str, object] = {} if tmdb_id: ids["tmdb"] = tmdb_id if imdb_id: ids["imdb"] = imdb_id key = "movies" if media_type == "movie" else "shows" body = {key: [{"ids": ids}]} status, _ = self._post("/sync/watchlist", body, token=token) return status in (200, 201) def remove_from_watchlist( self, token: str, *, media_type: str, tmdb_id: int, imdb_id: str = "", ) -> bool: """POST /sync/watchlist/remove""" ids: dict[str, object] = {} if tmdb_id: ids["tmdb"] = tmdb_id if imdb_id: ids["imdb"] = imdb_id key = "movies" if media_type == "movie" else "shows" body = {key: [{"ids": ids}]} status, _ = self._post("/sync/watchlist/remove", body, token=token) return status == 200 # ------------------------------------------------------------------- # History # ------------------------------------------------------------------- def get_history( self, token: str, *, media_type: str = "", page: int = 1, limit: int = 20, ) -> list[TraktItem]: """GET /users/me/history[/movies|/shows|/episodes]""" path = "/users/me/history" if media_type in ("movies", "shows", "episodes"): path = f"{path}/{media_type}" path = f"{path}?page={page}&limit={limit}" status, payload = self._get(path, token=token) if status != 200 or not isinstance(payload, list): return [] return self._parse_history_items(payload) # ------------------------------------------------------------------- # Calendar # ------------------------------------------------------------------- def get_calendar(self, token: str, start_date: str = "", days: int = 7) -> list[TraktCalendarItem]: """GET /calendars/my/shows/{start_date}/{days} start_date: YYYY-MM-DD (leer = heute). Liefert anstehende Episoden der eigenen Watchlist-Serien. """ if not start_date: from datetime import date start_date = date.today().strftime("%Y-%m-%d") path = f"/calendars/my/shows/{start_date}/{days}" status, payload = self._get(path, token=token) if status != 200 or not isinstance(payload, list): return [] items: list[TraktCalendarItem] = [] for entry in payload: if not isinstance(entry, dict): continue show = entry.get("show") or {} ep = entry.get("episode") or {} show_ids = self._parse_ids(show.get("ids") or {}) items.append(TraktCalendarItem( show_title=str(show.get("title", "") or ""), show_year=int(show.get("year", 0) or 0), show_ids=show_ids, season=int(ep.get("season", 0) or 0), episode=int(ep.get("number", 0) or 0), episode_title=str(ep.get("title", "") or ""), first_aired=str(entry.get("first_aired", "") or ""), )) return items # ------------------------------------------------------------------- # Parser # ------------------------------------------------------------------- @staticmethod def _parse_ids(ids_dict: dict) -> TraktMediaIds: return TraktMediaIds( trakt=int(ids_dict.get("trakt", 0) or 0), tmdb=int(ids_dict.get("tmdb", 0) or 0), imdb=str(ids_dict.get("imdb", "") or ""), slug=str(ids_dict.get("slug", "") or ""), tvdb=int(ids_dict.get("tvdb", 0) or 0), ) def _parse_list_items(self, items: list) -> list[TraktItem]: result: list[TraktItem] = [] for entry in items: if not isinstance(entry, dict): continue item_type = entry.get("type", "") media = entry.get(item_type) or entry.get("movie") or entry.get("show") or {} if not isinstance(media, dict): continue ids = self._parse_ids(media.get("ids") or {}) result.append(TraktItem( title=str(media.get("title", "") or ""), year=int(media.get("year", 0) or 0), media_type=item_type, ids=ids, )) return result def _parse_history_items(self, items: list) -> list[TraktItem]: result: list[TraktItem] = [] for entry in items: if not isinstance(entry, dict): continue item_type = entry.get("type", "") watched_at = str(entry.get("watched_at", "") or "") if item_type == "episode": show = entry.get("show") or {} ep = entry.get("episode") or {} ids = self._parse_ids((show.get("ids") or {})) result.append(TraktItem( title=str(show.get("title", "") or ""), year=int(show.get("year", 0) or 0), media_type="episode", ids=ids, season=int(ep.get("season", 0) or 0), episode=int(ep.get("number", 0) or 0), watched_at=watched_at, )) else: media = entry.get("movie") or entry.get("show") or {} ids = self._parse_ids(media.get("ids") or {}) result.append(TraktItem( title=str(media.get("title", "") or ""), year=int(media.get("year", 0) or 0), media_type=item_type, ids=ids, watched_at=watched_at, )) return result