"""AniWorld (aniworld.to) Integration als Downloader-Plugin. Dieses Plugin ist weitgehend kompatibel zur Serienstream-Integration: - gleiche Staffel-/Episoden-URL-Struktur (/staffel-x/episode-y) - gleiche Hoster-/Watch-Layouts (best-effort) """ from __future__ import annotations from dataclasses import dataclass import re from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias try: # pragma: no cover - optional dependency import requests from bs4 import BeautifulSoup # type: ignore[import-not-found] except ImportError as exc: # pragma: no cover - optional dependency requests = None BeautifulSoup = None REQUESTS_AVAILABLE = False REQUESTS_IMPORT_ERROR = exc else: REQUESTS_AVAILABLE = True REQUESTS_IMPORT_ERROR = None try: # pragma: no cover - optional Kodi helpers import xbmcaddon # type: ignore[import-not-found] except ImportError: # pragma: no cover - allow running outside Kodi xbmcaddon = None from plugin_interface import BasisPlugin from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_url, notify_url from http_session_pool import get_requests_session from regex_patterns import DIGITS, SEASON_EPISODE_TAG, SEASON_EPISODE_URL, STAFFEL_NUM_IN_URL if TYPE_CHECKING: # pragma: no cover from requests import Session as RequestsSession from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found] else: # pragma: no cover RequestsSession: TypeAlias = Any BeautifulSoupT: TypeAlias = Any SETTING_BASE_URL = "aniworld_base_url" DEFAULT_BASE_URL = "https://aniworld.to" DEFAULT_PREFERRED_HOSTERS = ["voe"] DEFAULT_TIMEOUT = 20 ADDON_ID = "plugin.video.viewit" GLOBAL_SETTING_LOG_URLS = "debug_log_urls" GLOBAL_SETTING_DUMP_HTML = "debug_dump_html" GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info" HEADERS = { "User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Connection": "keep-alive", } @dataclass class SeriesResult: title: str description: str url: str @dataclass class EpisodeInfo: number: int title: str original_title: str url: str @dataclass class LatestEpisode: series_title: str season: int episode: int url: str airdate: str @dataclass class SeasonInfo: number: int url: str episodes: List[EpisodeInfo] def _get_base_url() -> str: base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip() if not base: base = DEFAULT_BASE_URL return base.rstrip("/") def _anime_base_url() -> str: return f"{_get_base_url()}/anime/stream" def _popular_animes_url() -> str: return f"{_get_base_url()}/beliebte-animes" def _genres_url() -> str: return f"{_get_base_url()}/animes" def _latest_episodes_url() -> str: return f"{_get_base_url()}/neue-episoden" def _search_url(query: str) -> str: return f"{_get_base_url()}/search?q={query}" def _search_api_url() -> str: return f"{_get_base_url()}/ajax/search" def _absolute_url(href: str) -> str: return f"{_get_base_url()}{href}" if href.startswith("/") else href def _log_url(url: str, *, kind: str = "VISIT") -> None: log_url(ADDON_ID, enabled_setting_id=GLOBAL_SETTING_LOG_URLS, log_filename="aniworld_urls.log", url=url, kind=kind) def _log_visit(url: str) -> None: _log_url(url, kind="VISIT") notify_url(ADDON_ID, heading="AniWorld", url=url, enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO) def _log_parsed_url(url: str) -> None: _log_url(url, kind="PARSE") def _log_response_html(url: str, body: str) -> None: dump_response_html( ADDON_ID, enabled_setting_id=GLOBAL_SETTING_DUMP_HTML, url=url, body=body, filename_prefix="aniworld_response", ) def _normalize_search_text(value: str) -> str: value = (value or "").casefold() value = re.sub(r"[^a-z0-9]+", " ", value) value = re.sub(r"\s+", " ", value).strip() return value def _strip_html(text: str) -> str: if not text: return "" return re.sub(r"<[^>]+>", "", text) def _matches_query(query: str, *, title: str) -> bool: normalized_query = _normalize_search_text(query) if not normalized_query: return False haystack = _normalize_search_text(title) if not haystack: return False return normalized_query in haystack def _ensure_requests() -> None: if requests is None or BeautifulSoup is None: raise RuntimeError("requests/bs4 sind nicht verfuegbar.") def _looks_like_cloudflare_challenge(body: str) -> bool: lower = body.lower() markers = ( "cf-browser-verification", "cf-challenge", "cf_chl", "challenge-platform", "attention required! | cloudflare", "just a moment...", "cloudflare ray id", ) return any(marker in lower for marker in markers) def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT: _ensure_requests() _log_visit(url) sess = session or get_requests_session("aniworld", headers=HEADERS) response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() if response.url and response.url != url: _log_url(response.url, kind="REDIRECT") _log_response_html(url, response.text) if _looks_like_cloudflare_challenge(response.text): raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") return BeautifulSoup(response.text, "html.parser") def _get_soup_simple(url: str) -> BeautifulSoupT: _ensure_requests() _log_visit(url) sess = get_requests_session("aniworld", headers=HEADERS) response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() if response.url and response.url != url: _log_url(response.url, kind="REDIRECT") _log_response_html(url, response.text) if _looks_like_cloudflare_challenge(response.text): raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") return BeautifulSoup(response.text, "html.parser") def _post_json(url: str, *, payload: Dict[str, str], session: Optional[RequestsSession] = None) -> Any: _ensure_requests() _log_visit(url) sess = session or get_requests_session("aniworld", headers=HEADERS) response = sess.post(url, data=payload, headers=HEADERS, timeout=DEFAULT_TIMEOUT) response.raise_for_status() if response.url and response.url != url: _log_url(response.url, kind="REDIRECT") _log_response_html(url, response.text) if _looks_like_cloudflare_challenge(response.text): raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.") try: return response.json() except Exception: return None def _extract_canonical_url(soup: BeautifulSoupT, fallback: str) -> str: canonical = soup.select_one('link[rel="canonical"][href]') href = (canonical.get("href") if canonical else "") or "" href = href.strip() if href.startswith("http://") or href.startswith("https://"): return href.rstrip("/") return fallback.rstrip("/") def _series_root_url(url: str) -> str: normalized = (url or "").strip().rstrip("/") normalized = re.sub(r"/staffel-\d+(?:/.*)?$", "", normalized) normalized = re.sub(r"/episode-\d+(?:/.*)?$", "", normalized) return normalized.rstrip("/") def _extract_season_links(soup: BeautifulSoupT) -> List[Tuple[int, str]]: season_links: List[Tuple[int, str]] = [] seen_numbers: set[int] = set() for anchor in soup.select('.hosterSiteDirectNav a[href*="/staffel-"]'): href = anchor.get("href") or "" if "/episode-" in href: continue match = re.search(STAFFEL_NUM_IN_URL, href) if match: number = int(match.group(1)) else: label = anchor.get_text(strip=True) if not label.isdigit(): continue number = int(label) if number in seen_numbers: continue seen_numbers.add(number) season_url = _absolute_url(href) if season_url: _log_parsed_url(season_url) season_links.append((number, season_url)) season_links.sort(key=lambda item: item[0]) return season_links def _extract_number_of_seasons(soup: BeautifulSoupT) -> Optional[int]: tag = soup.select_one('meta[itemprop="numberOfSeasons"]') if not tag: return None content = (tag.get("content") or "").strip() if not content.isdigit(): return None count = int(content) return count if count > 0 else None def _extract_episodes(soup: BeautifulSoupT) -> List[EpisodeInfo]: episodes: List[EpisodeInfo] = [] rows = soup.select("table.seasonEpisodesList tbody tr") for index, row in enumerate(rows): cells = row.find_all("td") if not cells: continue episode_cell = cells[0] number_text = episode_cell.get_text(strip=True) digits = "".join(ch for ch in number_text if ch.isdigit()) number = int(digits) if digits else index + 1 link = episode_cell.find("a") href = link.get("href") if link else "" url = _absolute_url(href or "") if url: _log_parsed_url(url) title_tag = row.select_one(".seasonEpisodeTitle strong") original_tag = row.select_one(".seasonEpisodeTitle span") title = title_tag.get_text(strip=True) if title_tag else "" original_title = original_tag.get_text(strip=True) if original_tag else "" if url: episodes.append(EpisodeInfo(number=number, title=title, original_title=original_title, url=url)) return episodes _LATEST_EPISODE_TAG_RE = re.compile(SEASON_EPISODE_TAG, re.IGNORECASE) _LATEST_EPISODE_URL_RE = re.compile(SEASON_EPISODE_URL, re.IGNORECASE) def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]: episodes: List[LatestEpisode] = [] seen: set[str] = set() for anchor in soup.select(".newEpisodeList a[href]"): href = (anchor.get("href") or "").strip() if not href or "/anime/stream/" not in href: continue url = _absolute_url(href) if not url: continue title_tag = anchor.select_one("strong") series_title = (title_tag.get_text(strip=True) if title_tag else "").strip() if not series_title: continue season_number: Optional[int] = None episode_number: Optional[int] = None match = _LATEST_EPISODE_URL_RE.search(href) if match: season_number = int(match.group(1)) episode_number = int(match.group(2)) if season_number is None or episode_number is None: tag_node = ( anchor.select_one("span.listTag.bigListTag.blue2") or anchor.select_one("span.listTag.blue2") or anchor.select_one("span.blue2") ) tag_text = (tag_node.get_text(" ", strip=True) if tag_node else "").strip() match = _LATEST_EPISODE_TAG_RE.search(tag_text) if not match: continue season_number = int(match.group(1)) episode_number = int(match.group(2)) if season_number is None or episode_number is None: continue airdate_node = anchor.select_one("span.elementFloatRight") airdate = (airdate_node.get_text(" ", strip=True) if airdate_node else "").strip() key = f"{url}\t{season_number}\t{episode_number}" if key in seen: continue seen.add(key) _log_parsed_url(url) episodes.append( LatestEpisode( series_title=series_title, season=season_number, episode=episode_number, url=url, airdate=airdate, ) ) return episodes def scrape_anime_detail(anime_identifier: str, max_seasons: Optional[int] = None) -> List[SeasonInfo]: _ensure_requests() anime_url = _series_root_url(_absolute_url(anime_identifier)) _log_url(anime_url, kind="ANIME") session = get_requests_session("aniworld", headers=HEADERS) try: _get_soup(_get_base_url(), session=session) except Exception: pass soup = _get_soup(anime_url, session=session) base_anime_url = _series_root_url(_extract_canonical_url(soup, anime_url)) season_links = _extract_season_links(soup) season_count = _extract_number_of_seasons(soup) if season_count and (not season_links or len(season_links) < season_count): existing = {number for number, _ in season_links} for number in range(1, season_count + 1): if number in existing: continue season_url = f"{base_anime_url}/staffel-{number}" _log_parsed_url(season_url) season_links.append((number, season_url)) season_links.sort(key=lambda item: item[0]) if max_seasons is not None: season_links = season_links[:max_seasons] seasons: List[SeasonInfo] = [] for number, url in season_links: season_soup = _get_soup(url, session=session) episodes = _extract_episodes(season_soup) seasons.append(SeasonInfo(number=number, url=url, episodes=episodes)) seasons.sort(key=lambda s: s.number) return seasons def resolve_redirect(target_url: str) -> Optional[str]: _ensure_requests() normalized_url = _absolute_url(target_url) _log_visit(normalized_url) session = get_requests_session("aniworld", headers=HEADERS) _get_soup(_get_base_url(), session=session) response = session.get(normalized_url, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True) if response.url: _log_url(response.url, kind="RESOLVED") return response.url if response.url else None def fetch_episode_hoster_names(episode_url: str) -> List[str]: _ensure_requests() normalized_url = _absolute_url(episode_url) session = get_requests_session("aniworld", headers=HEADERS) _get_soup(_get_base_url(), session=session) soup = _get_soup(normalized_url, session=session) names: List[str] = [] seen: set[str] = set() for anchor in soup.select(".hosterSiteVideo a.watchEpisode"): title = anchor.select_one("h4") name = title.get_text(strip=True) if title else "" if not name: name = anchor.get_text(" ", strip=True) name = (name or "").strip() if name.lower().startswith("hoster "): name = name[7:].strip() href = anchor.get("href") or "" url = _absolute_url(href) if url: _log_parsed_url(url) key = name.casefold().strip() if not key or key in seen: continue seen.add(key) names.append(name) if names: _log_url(f"{normalized_url}#hosters={','.join(names)}", kind="HOSTERS") return names def fetch_episode_stream_link( episode_url: str, *, preferred_hosters: Optional[List[str]] = None, ) -> Optional[str]: _ensure_requests() normalized_url = _absolute_url(episode_url) preferred = [hoster.lower() for hoster in (preferred_hosters or DEFAULT_PREFERRED_HOSTERS)] session = get_requests_session("aniworld", headers=HEADERS) _get_soup(_get_base_url(), session=session) soup = _get_soup(normalized_url, session=session) candidates: List[Tuple[str, str]] = [] for anchor in soup.select(".hosterSiteVideo a.watchEpisode"): name_tag = anchor.select_one("h4") name = name_tag.get_text(strip=True) if name_tag else "" href = anchor.get("href") or "" url = _absolute_url(href) if url: _log_parsed_url(url) if name and url: candidates.append((name, url)) if not candidates: return None candidates.sort(key=lambda item: item[0].casefold()) selected_url = None for wanted in preferred: for name, url in candidates: if wanted in name.casefold(): selected_url = url break if selected_url: break if not selected_url: selected_url = candidates[0][1] resolved = resolve_redirect(selected_url) or selected_url return resolved def search_animes(query: str) -> List[SeriesResult]: _ensure_requests() query = (query or "").strip() if not query: return [] session = get_requests_session("aniworld", headers=HEADERS) try: session.get(_get_base_url(), headers=HEADERS, timeout=DEFAULT_TIMEOUT) except Exception: pass data = _post_json(_search_api_url(), payload={"keyword": query}, session=session) results: List[SeriesResult] = [] seen: set[str] = set() if isinstance(data, list): for entry in data: if not isinstance(entry, dict): continue title = _strip_html((entry.get("title") or "").strip()) if not title or not _matches_query(query, title=title): continue link = (entry.get("link") or "").strip() if not link.startswith("/anime/stream/"): continue if "/staffel-" in link or "/episode-" in link: continue if link.rstrip("/") == "/anime/stream": continue url = _absolute_url(link) if link else "" if url: _log_parsed_url(url) key = title.casefold().strip() if key in seen: continue seen.add(key) description = (entry.get("description") or "").strip() results.append(SeriesResult(title=title, description=description, url=url)) return results soup = _get_soup_simple(_search_url(requests.utils.quote(query))) for anchor in soup.select("a[href^='/anime/stream/'][href]"): href = (anchor.get("href") or "").strip() if not href or "/staffel-" in href or "/episode-" in href: continue url = _absolute_url(href) if url: _log_parsed_url(url) title_node = anchor.select_one("h3") or anchor.select_one("strong") title = (title_node.get_text(" ", strip=True) if title_node else anchor.get_text(" ", strip=True)).strip() if not title: continue if not _matches_query(query, title=title): continue key = title.casefold().strip() if key in seen: continue seen.add(key) results.append(SeriesResult(title=title, description="", url=url)) return results class AniworldPlugin(BasisPlugin): name = "AniWorld (aniworld.to)" def __init__(self) -> None: self._anime_results: Dict[str, SeriesResult] = {} self._season_cache: Dict[str, List[SeasonInfo]] = {} self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {} self._popular_cache: Optional[List[SeriesResult]] = None self._genre_cache: Optional[Dict[str, List[SeriesResult]]] = None self._latest_cache: Dict[int, List[LatestEpisode]] = {} self._latest_hoster_cache: Dict[str, List[str]] = {} self._requests_available = REQUESTS_AVAILABLE self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS) self._preferred_hosters: List[str] = list(self._default_preferred_hosters) self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {} self.is_available = True self.unavailable_reason: Optional[str] = None if not self._requests_available: # pragma: no cover - optional dependency self.is_available = False self.unavailable_reason = "requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'." if REQUESTS_IMPORT_ERROR: print(f"AniworldPlugin Importfehler: {REQUESTS_IMPORT_ERROR}") def capabilities(self) -> set[str]: return {"popular_series", "genres", "latest_episodes"} def _find_series_by_title(self, title: str) -> Optional[SeriesResult]: title = (title or "").strip() if not title: return None direct = self._anime_results.get(title) if direct: return direct wanted = title.casefold().strip() for candidate in self._anime_results.values(): if candidate.title and candidate.title.casefold().strip() == wanted: return candidate try: for entry in self._ensure_popular(): if entry.title and entry.title.casefold().strip() == wanted: self._anime_results[entry.title] = entry return entry except Exception: pass try: for entries in self._ensure_genres().values(): for entry in entries: if entry.title and entry.title.casefold().strip() == wanted: self._anime_results[entry.title] = entry return entry except Exception: pass try: for entry in search_animes(title): if entry.title and entry.title.casefold().strip() == wanted: self._anime_results[entry.title] = entry return entry except Exception: pass return None def _ensure_popular(self) -> List[SeriesResult]: if self._popular_cache is not None: return list(self._popular_cache) soup = _get_soup_simple(_popular_animes_url()) results: List[SeriesResult] = [] seen: set[str] = set() for anchor in soup.select("div.seriesListContainer a[href^='/anime/stream/']"): href = (anchor.get("href") or "").strip() if not href or "/staffel-" in href or "/episode-" in href: continue url = _absolute_url(href) if url: _log_parsed_url(url) title_node = anchor.select_one("h3") title = (title_node.get_text(" ", strip=True) if title_node else "").strip() if not title: continue description = "" desc_node = anchor.select_one("small") if desc_node: description = desc_node.get_text(" ", strip=True).strip() key = title.casefold().strip() if key in seen: continue seen.add(key) results.append(SeriesResult(title=title, description=description, url=url)) self._popular_cache = list(results) return list(results) def popular_series(self) -> List[str]: if not self._requests_available: return [] entries = self._ensure_popular() self._anime_results.update({entry.title: entry for entry in entries if entry.title}) return [entry.title for entry in entries if entry.title] def latest_episodes(self, page: int = 1) -> List[LatestEpisode]: if not self._requests_available: return [] try: page = int(page or 1) except Exception: page = 1 page = max(1, page) cached = self._latest_cache.get(page) if cached is not None: return list(cached) url = _latest_episodes_url() if page > 1: url = f"{url}?page={page}" soup = _get_soup_simple(url) episodes = _extract_latest_episodes(soup) self._latest_cache[page] = list(episodes) return list(episodes) def _ensure_genres(self) -> Dict[str, List[SeriesResult]]: if self._genre_cache is not None: return {key: list(value) for key, value in self._genre_cache.items()} soup = _get_soup_simple(_genres_url()) results: Dict[str, List[SeriesResult]] = {} genre_blocks = soup.select("#seriesContainer div.genre") if not genre_blocks: genre_blocks = soup.select("div.genre") for genre_block in genre_blocks: name_node = genre_block.select_one(".seriesGenreList h3") genre_name = (name_node.get_text(" ", strip=True) if name_node else "").strip() if not genre_name: continue entries: List[SeriesResult] = [] seen: set[str] = set() for anchor in genre_block.select("ul li a[href]"): href = (anchor.get("href") or "").strip() if not href or "/staffel-" in href or "/episode-" in href: continue url = _absolute_url(href) if url: _log_parsed_url(url) title = (anchor.get_text(" ", strip=True) or "").strip() if not title: continue key = title.casefold().strip() if key in seen: continue seen.add(key) entries.append(SeriesResult(title=title, description="", url=url)) if entries: results[genre_name] = entries self._genre_cache = {key: list(value) for key, value in results.items()} # Für spätere Auflösung (Seasons/Episoden) die Titel->URL Zuordnung auffüllen. for entries in results.values(): for entry in entries: if not entry.title: continue if entry.title not in self._anime_results: self._anime_results[entry.title] = entry return {key: list(value) for key, value in results.items()} def genres(self) -> List[str]: if not self._requests_available: return [] genres = list(self._ensure_genres().keys()) return [g for g in genres if g] def titles_for_genre(self, genre: str) -> List[str]: genre = (genre or "").strip() if not genre or not self._requests_available: return [] mapping = self._ensure_genres() entries = mapping.get(genre) if entries is None: wanted = genre.casefold() for key, value in mapping.items(): if key.casefold() == wanted: entries = value break if not entries: return [] # Zusätzlich sicherstellen, dass die Titel im Cache sind. self._anime_results.update({entry.title: entry for entry in entries if entry.title and entry.title not in self._anime_results}) return [entry.title for entry in entries if entry.title] def _season_label(self, number: int) -> str: return f"Staffel {number}" def _parse_season_number(self, season_label: str) -> Optional[int]: match = re.search(DIGITS, season_label or "") return int(match.group(1)) if match else None def _episode_label(self, info: EpisodeInfo) -> str: title = (info.title or "").strip() if title: return f"Episode {info.number} - {title}" return f"Episode {info.number}" def _cache_episode_labels(self, title: str, season_label: str, season_info: SeasonInfo) -> None: cache_key = (title, season_label) self._episode_label_cache[cache_key] = {self._episode_label(info): info for info in season_info.episodes} def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]: cache_key = (title, season_label) cached = self._episode_label_cache.get(cache_key) if cached: return cached.get(episode_label) seasons = self._ensure_seasons(title) number = self._parse_season_number(season_label) if number is None: return None for season_info in seasons: if season_info.number == number: self._cache_episode_labels(title, season_label, season_info) return self._episode_label_cache.get(cache_key, {}).get(episode_label) return None async def search_titles(self, query: str) -> List[str]: query = (query or "").strip() if not query: self._anime_results.clear() self._season_cache.clear() self._episode_label_cache.clear() self._popular_cache = None return [] if not self._requests_available: raise RuntimeError("AniworldPlugin kann ohne requests/bs4 nicht suchen.") try: results = search_animes(query) except Exception as exc: # pragma: no cover self._anime_results.clear() self._season_cache.clear() self._episode_label_cache.clear() raise RuntimeError(f"AniWorld-Suche fehlgeschlagen: {exc}") from exc self._anime_results = {result.title: result for result in results} self._season_cache.clear() self._episode_label_cache.clear() return [result.title for result in results] def _ensure_seasons(self, title: str) -> List[SeasonInfo]: if title in self._season_cache: return self._season_cache[title] anime = self._find_series_by_title(title) if not anime: return [] seasons = scrape_anime_detail(anime.url) self._season_cache[title] = list(seasons) return list(seasons) def seasons_for(self, title: str) -> List[str]: seasons = self._ensure_seasons(title) return [self._season_label(season.number) for season in seasons if season.episodes] def episodes_for(self, title: str, season: str) -> List[str]: seasons = self._ensure_seasons(title) number = self._parse_season_number(season) if number is None: return [] for season_info in seasons: if season_info.number == number: labels = [self._episode_label(info) for info in season_info.episodes] self._cache_episode_labels(title, season, season_info) return labels return [] def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Stream-Links liefern.") episode_info = self._lookup_episode(title, season, episode) if not episode_info: return None link = fetch_episode_stream_link(episode_info.url, preferred_hosters=self._preferred_hosters) if link: _log_url(link, kind="FOUND") return link def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]: if not self._requests_available: raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Hoster laden.") cache_key = (title, season, episode) cached = self._hoster_cache.get(cache_key) if cached is not None: return list(cached) episode_info = self._lookup_episode(title, season, episode) if not episode_info: return [] names = fetch_episode_hoster_names(episode_info.url) self._hoster_cache[cache_key] = list(names) return list(names) def available_hosters_for_url(self, episode_url: str) -> List[str]: if not self._requests_available: raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Hoster laden.") normalized = _absolute_url(episode_url) cached = self._latest_hoster_cache.get(normalized) if cached is not None: return list(cached) names = fetch_episode_hoster_names(normalized) self._latest_hoster_cache[normalized] = list(names) return list(names) def stream_link_for_url(self, episode_url: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Stream-Links liefern.") normalized = _absolute_url(episode_url) link = fetch_episode_stream_link(normalized, preferred_hosters=self._preferred_hosters) if link: _log_url(link, kind="FOUND") return link def resolve_stream_link(self, link: str) -> Optional[str]: if not self._requests_available: raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Stream-Links aufloesen.") resolved = resolve_redirect(link) if not resolved: return None try: from resolveurl_backend import resolve as resolve_with_resolveurl except Exception: resolve_with_resolveurl = None if callable(resolve_with_resolveurl): resolved_by_resolveurl = resolve_with_resolveurl(resolved) if resolved_by_resolveurl: _log_url("ResolveURL", kind="HOSTER_RESOLVER") _log_url(resolved_by_resolveurl, kind="MEDIA") return resolved_by_resolveurl _log_url(resolved, kind="FINAL") return resolved def set_preferred_hosters(self, hosters: List[str]) -> None: normalized = [hoster.strip().lower() for hoster in hosters if hoster.strip()] if normalized: self._preferred_hosters = normalized def reset_preferred_hosters(self) -> None: self._preferred_hosters = list(self._default_preferred_hosters) Plugin = AniworldPlugin