"""AniWorld (aniworld.to) Integration als Downloader-Plugin.
Dieses Plugin ist weitgehend kompatibel zur Serienstream-Integration:
- gleiche Staffel-/Episoden-URL-Struktur (/staffel-x/episode-y)
- gleiche Hoster-/Watch-Layouts (best-effort)
"""
from __future__ import annotations
from dataclasses import dataclass
from html import unescape
import hashlib
import json
import re
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeAlias
try: # pragma: no cover - optional dependency
import requests
from bs4 import BeautifulSoup # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover - optional dependency
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
try: # pragma: no cover - optional Kodi helpers
import xbmcaddon # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow running outside Kodi
xbmcaddon = None
xbmcgui = None
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
from regex_patterns import DIGITS, SEASON_EPISODE_TAG, SEASON_EPISODE_URL, STAFFEL_NUM_IN_URL
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
from bs4 import BeautifulSoup as BeautifulSoupT # type: ignore[import-not-found]
else: # pragma: no cover
RequestsSession: TypeAlias = Any
BeautifulSoupT: TypeAlias = Any
SETTING_BASE_URL = "aniworld_base_url"
DEFAULT_BASE_URL = "https://aniworld.to"
DEFAULT_PREFERRED_HOSTERS = ["voe"]
DEFAULT_TIMEOUT = 20
ADDON_ID = "plugin.video.viewit"
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
GLOBAL_SETTING_SHOW_URL_INFO = "debug_show_url_info"
GLOBAL_SETTING_LOG_ERRORS = "debug_log_errors"
SETTING_LOG_URLS = "log_urls_aniworld"
SETTING_DUMP_HTML = "dump_html_aniworld"
SETTING_SHOW_URL_INFO = "show_url_info_aniworld"
SETTING_LOG_ERRORS = "log_errors_aniworld"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
SESSION_CACHE_TTL_SECONDS = 300
SESSION_CACHE_PREFIX = "viewit.aniworld"
SESSION_CACHE_MAX_TITLE_URLS = 800
@dataclass
class SeriesResult:
title: str
description: str
url: str
@dataclass
class EpisodeInfo:
number: int
title: str
original_title: str
url: str
@dataclass
class LatestEpisode:
series_title: str
season: int
episode: int
url: str
airdate: str
@dataclass
class SeasonInfo:
number: int
url: str
episodes: List[EpisodeInfo]
def _get_base_url() -> str:
base = get_setting_string(ADDON_ID, SETTING_BASE_URL, default=DEFAULT_BASE_URL).strip()
if not base:
base = DEFAULT_BASE_URL
return base.rstrip("/")
def _anime_base_url() -> str:
return f"{_get_base_url()}/anime/stream"
def _popular_animes_url() -> str:
return f"{_get_base_url()}/beliebte-animes"
def _genres_url() -> str:
return f"{_get_base_url()}/animes"
def _latest_episodes_url() -> str:
return f"{_get_base_url()}/neue-episoden"
def _search_url(query: str) -> str:
return f"{_get_base_url()}/search?q={query}"
def _search_api_url() -> str:
return f"{_get_base_url()}/ajax/search"
def _absolute_url(href: str) -> str:
return f"{_get_base_url()}{href}" if href.startswith("/") else href
def _session_window() -> Any:
if xbmcgui is None:
return None
try:
return xbmcgui.Window(10000)
except Exception:
return None
def _session_cache_key(name: str) -> str:
base_hash = hashlib.sha1(_get_base_url().encode("utf-8")).hexdigest()[:12]
return f"{SESSION_CACHE_PREFIX}.{base_hash}.{name}"
def _session_cache_get(name: str) -> Any:
window = _session_window()
if window is None:
return None
raw = ""
try:
raw = window.getProperty(_session_cache_key(name)) or ""
except Exception:
return None
if not raw:
return None
try:
payload = json.loads(raw)
except Exception:
return None
if not isinstance(payload, dict):
return None
expires_at = payload.get("expires_at")
data = payload.get("data")
try:
if float(expires_at or 0) <= time.time():
return None
except Exception:
return None
return data
def _session_cache_set(name: str, data: Any, *, ttl_seconds: int = SESSION_CACHE_TTL_SECONDS) -> None:
window = _session_window()
if window is None:
return
payload = {
"expires_at": float(time.time() + max(1, int(ttl_seconds))),
"data": data,
}
try:
raw = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
except Exception:
return
if len(raw) > 240_000:
return
try:
window.setProperty(_session_cache_key(name), raw)
except Exception:
return
def _log_url(url: str, *, kind: str = "VISIT") -> None:
log_url(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_URLS,
plugin_setting_id=SETTING_LOG_URLS,
log_filename="aniworld_urls.log",
url=url,
kind=kind,
)
def _log_visit(url: str) -> None:
_log_url(url, kind="VISIT")
notify_url(
ADDON_ID,
heading="AniWorld",
url=url,
enabled_setting_id=GLOBAL_SETTING_SHOW_URL_INFO,
plugin_setting_id=SETTING_SHOW_URL_INFO,
)
def _log_parsed_url(url: str) -> None:
_log_url(url, kind="PARSE")
def _log_response_html(url: str, body: str) -> None:
dump_response_html(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_DUMP_HTML,
plugin_setting_id=SETTING_DUMP_HTML,
url=url,
body=body,
filename_prefix="aniworld_response",
)
def _log_error(message: str) -> None:
log_error(
ADDON_ID,
enabled_setting_id=GLOBAL_SETTING_LOG_ERRORS,
plugin_setting_id=SETTING_LOG_ERRORS,
log_filename="aniworld_errors.log",
message=message,
)
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def _strip_html(text: str) -> str:
if not text:
return ""
return re.sub(r"<[^>]+>", "", text)
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
def _ensure_requests() -> None:
if requests is None or BeautifulSoup is None:
raise RuntimeError("requests/bs4 sind nicht verfuegbar.")
def _looks_like_cloudflare_challenge(body: str) -> bool:
lower = body.lower()
markers = (
"cf-browser-verification",
"cf-challenge",
"cf_chl",
"challenge-platform",
"attention required! | cloudflare",
"just a moment...",
"cloudflare ray id",
)
return any(marker in lower for marker in markers)
def _get_soup(url: str, *, session: Optional[RequestsSession] = None) -> BeautifulSoupT:
_ensure_requests()
_log_visit(url)
sess = session or get_requests_session("aniworld", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
if _looks_like_cloudflare_challenge(response.text):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return BeautifulSoup(response.text, "html.parser")
def _get_html_simple(url: str) -> str:
_ensure_requests()
_log_visit(url)
sess = get_requests_session("aniworld", headers=HEADERS)
try:
response = sess.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
except Exception as exc:
_log_error(f"GET {url} failed: {exc}")
raise
if response.url and response.url != url:
_log_url(response.url, kind="REDIRECT")
body = response.text
_log_response_html(url, body)
if _looks_like_cloudflare_challenge(body):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
return body
def _get_soup_simple(url: str) -> BeautifulSoupT:
body = _get_html_simple(url)
return BeautifulSoup(body, "html.parser")
def _extract_genre_names_from_html(body: str) -> List[str]:
names: List[str] = []
seen: set[str] = set()
pattern = re.compile(
r"
]*class=[\"'][^\"']*seriesGenreList[^\"']*[\"'][^>]*>.*?
]*>(.*?)
",
re.IGNORECASE | re.DOTALL,
)
for match in pattern.finditer(body or ""):
text = re.sub(r"<[^>]+>", " ", match.group(1) or "")
text = unescape(re.sub(r"\s+", " ", text)).strip()
if not text:
continue
key = text.casefold()
if key in seen:
continue
seen.add(key)
names.append(text)
return names
def _post_json(url: str, *, payload: Dict[str, str], session: Optional[RequestsSession] = None) -> Any:
_ensure_requests()
_log_visit(url)
sess = session or get_requests_session("aniworld", headers=HEADERS)
response = sess.post(url, data=payload, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
if response.url and response.url != url:
_log_url(response.url, kind="REDIRECT")
_log_response_html(url, response.text)
if _looks_like_cloudflare_challenge(response.text):
raise RuntimeError("Cloudflare-Schutz erkannt. requests reicht ggf. nicht aus.")
try:
return response.json()
except Exception:
return None
def _extract_canonical_url(soup: BeautifulSoupT, fallback: str) -> str:
canonical = soup.select_one('link[rel="canonical"][href]')
href = (canonical.get("href") if canonical else "") or ""
href = href.strip()
if href.startswith("http://") or href.startswith("https://"):
return href.rstrip("/")
return fallback.rstrip("/")
def _series_root_url(url: str) -> str:
normalized = (url or "").strip().rstrip("/")
normalized = re.sub(r"/staffel-\d+(?:/.*)?$", "", normalized)
normalized = re.sub(r"/episode-\d+(?:/.*)?$", "", normalized)
return normalized.rstrip("/")
def _extract_season_links(soup: BeautifulSoupT) -> List[Tuple[int, str]]:
season_links: List[Tuple[int, str]] = []
seen_numbers: set[int] = set()
for anchor in soup.select('.hosterSiteDirectNav a[href*="/staffel-"]'):
href = anchor.get("href") or ""
if "/episode-" in href:
continue
match = re.search(STAFFEL_NUM_IN_URL, href)
if match:
number = int(match.group(1))
else:
label = anchor.get_text(strip=True)
if not label.isdigit():
continue
number = int(label)
if number in seen_numbers:
continue
seen_numbers.add(number)
season_url = _absolute_url(href)
if season_url:
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
return season_links
def _extract_number_of_seasons(soup: BeautifulSoupT) -> Optional[int]:
tag = soup.select_one('meta[itemprop="numberOfSeasons"]')
if not tag:
return None
content = (tag.get("content") or "").strip()
if not content.isdigit():
return None
count = int(content)
return count if count > 0 else None
def _extract_episodes(soup: BeautifulSoupT) -> List[EpisodeInfo]:
episodes: List[EpisodeInfo] = []
rows = soup.select("table.seasonEpisodesList tbody tr")
for index, row in enumerate(rows):
cells = row.find_all("td")
if not cells:
continue
episode_cell = cells[0]
number_text = episode_cell.get_text(strip=True)
digits = "".join(ch for ch in number_text if ch.isdigit())
number = int(digits) if digits else index + 1
link = episode_cell.find("a")
href = link.get("href") if link else ""
url = _absolute_url(href or "")
if url:
_log_parsed_url(url)
title_tag = row.select_one(".seasonEpisodeTitle strong")
original_tag = row.select_one(".seasonEpisodeTitle span")
title = title_tag.get_text(strip=True) if title_tag else ""
original_title = original_tag.get_text(strip=True) if original_tag else ""
if url:
episodes.append(EpisodeInfo(number=number, title=title, original_title=original_title, url=url))
return episodes
_LATEST_EPISODE_TAG_RE = re.compile(SEASON_EPISODE_TAG, re.IGNORECASE)
_LATEST_EPISODE_URL_RE = re.compile(SEASON_EPISODE_URL, re.IGNORECASE)
def _extract_latest_episodes(soup: BeautifulSoupT) -> List[LatestEpisode]:
episodes: List[LatestEpisode] = []
seen: set[str] = set()
for anchor in soup.select(".newEpisodeList a[href]"):
href = (anchor.get("href") or "").strip()
if not href or "/anime/stream/" not in href:
continue
url = _absolute_url(href)
if not url:
continue
title_tag = anchor.select_one("strong")
series_title = (title_tag.get_text(strip=True) if title_tag else "").strip()
if not series_title:
continue
season_number: Optional[int] = None
episode_number: Optional[int] = None
match = _LATEST_EPISODE_URL_RE.search(href)
if match:
season_number = int(match.group(1))
episode_number = int(match.group(2))
if season_number is None or episode_number is None:
tag_node = (
anchor.select_one("span.listTag.bigListTag.blue2")
or anchor.select_one("span.listTag.blue2")
or anchor.select_one("span.blue2")
)
tag_text = (tag_node.get_text(" ", strip=True) if tag_node else "").strip()
match = _LATEST_EPISODE_TAG_RE.search(tag_text)
if not match:
continue
season_number = int(match.group(1))
episode_number = int(match.group(2))
if season_number is None or episode_number is None:
continue
airdate_node = anchor.select_one("span.elementFloatRight")
airdate = (airdate_node.get_text(" ", strip=True) if airdate_node else "").strip()
key = f"{url}\t{season_number}\t{episode_number}"
if key in seen:
continue
seen.add(key)
_log_parsed_url(url)
episodes.append(
LatestEpisode(
series_title=series_title,
season=season_number,
episode=episode_number,
url=url,
airdate=airdate,
)
)
return episodes
def scrape_anime_detail(
anime_identifier: str,
max_seasons: Optional[int] = None,
*,
load_episodes: bool = True,
) -> List[SeasonInfo]:
_ensure_requests()
anime_url = _series_root_url(_absolute_url(anime_identifier))
_log_url(anime_url, kind="ANIME")
session = get_requests_session("aniworld", headers=HEADERS)
soup = _get_soup(anime_url, session=session)
base_anime_url = _series_root_url(_extract_canonical_url(soup, anime_url))
season_links = _extract_season_links(soup)
season_count = _extract_number_of_seasons(soup)
if season_count and (not season_links or len(season_links) < season_count):
existing = {number for number, _ in season_links}
for number in range(1, season_count + 1):
if number in existing:
continue
season_url = f"{base_anime_url}/staffel-{number}"
_log_parsed_url(season_url)
season_links.append((number, season_url))
season_links.sort(key=lambda item: item[0])
if max_seasons is not None:
season_links = season_links[:max_seasons]
seasons: List[SeasonInfo] = []
for number, url in season_links:
episodes: List[EpisodeInfo] = []
if load_episodes:
season_soup = _get_soup(url, session=session)
episodes = _extract_episodes(season_soup)
seasons.append(SeasonInfo(number=number, url=url, episodes=episodes))
seasons.sort(key=lambda s: s.number)
return seasons
def resolve_redirect(target_url: str) -> Optional[str]:
_ensure_requests()
normalized_url = _absolute_url(target_url)
_log_visit(normalized_url)
session = get_requests_session("aniworld", headers=HEADERS)
_get_soup(_get_base_url(), session=session)
response = session.get(normalized_url, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
if response.url:
_log_url(response.url, kind="RESOLVED")
return response.url if response.url else None
def fetch_episode_hoster_names(episode_url: str) -> List[str]:
_ensure_requests()
normalized_url = _absolute_url(episode_url)
session = get_requests_session("aniworld", headers=HEADERS)
_get_soup(_get_base_url(), session=session)
soup = _get_soup(normalized_url, session=session)
names: List[str] = []
seen: set[str] = set()
for anchor in soup.select(".hosterSiteVideo a.watchEpisode"):
title = anchor.select_one("h4")
name = title.get_text(strip=True) if title else ""
if not name:
name = anchor.get_text(" ", strip=True)
name = (name or "").strip()
if name.lower().startswith("hoster "):
name = name[7:].strip()
href = anchor.get("href") or ""
url = _absolute_url(href)
if url:
_log_parsed_url(url)
key = name.casefold().strip()
if not key or key in seen:
continue
seen.add(key)
names.append(name)
if names:
_log_url(f"{normalized_url}#hosters={','.join(names)}", kind="HOSTERS")
return names
def fetch_episode_stream_link(
episode_url: str,
*,
preferred_hosters: Optional[List[str]] = None,
) -> Optional[str]:
_ensure_requests()
normalized_url = _absolute_url(episode_url)
preferred = [hoster.lower() for hoster in (preferred_hosters or DEFAULT_PREFERRED_HOSTERS)]
session = get_requests_session("aniworld", headers=HEADERS)
_get_soup(_get_base_url(), session=session)
soup = _get_soup(normalized_url, session=session)
candidates: List[Tuple[str, str]] = []
for anchor in soup.select(".hosterSiteVideo a.watchEpisode"):
name_tag = anchor.select_one("h4")
name = name_tag.get_text(strip=True) if name_tag else ""
href = anchor.get("href") or ""
url = _absolute_url(href)
if url:
_log_parsed_url(url)
if name and url:
candidates.append((name, url))
if not candidates:
return None
candidates.sort(key=lambda item: item[0].casefold())
selected_url = None
for wanted in preferred:
for name, url in candidates:
if wanted in name.casefold():
selected_url = url
break
if selected_url:
break
if not selected_url:
selected_url = candidates[0][1]
resolved = resolve_redirect(selected_url) or selected_url
return resolved
def search_animes(query: str) -> List[SeriesResult]:
_ensure_requests()
query = (query or "").strip()
if not query:
return []
session = get_requests_session("aniworld", headers=HEADERS)
try:
session.get(_get_base_url(), headers=HEADERS, timeout=DEFAULT_TIMEOUT)
except Exception:
pass
data = _post_json(_search_api_url(), payload={"keyword": query}, session=session)
results: List[SeriesResult] = []
seen: set[str] = set()
if isinstance(data, list):
for entry in data:
if not isinstance(entry, dict):
continue
title = _strip_html((entry.get("title") or "").strip())
if not title or not _matches_query(query, title=title):
continue
link = (entry.get("link") or "").strip()
if not link.startswith("/anime/stream/"):
continue
if "/staffel-" in link or "/episode-" in link:
continue
if link.rstrip("/") == "/anime/stream":
continue
url = _absolute_url(link) if link else ""
if url:
_log_parsed_url(url)
key = title.casefold().strip()
if key in seen:
continue
seen.add(key)
description = (entry.get("description") or "").strip()
results.append(SeriesResult(title=title, description=description, url=url))
return results
soup = _get_soup_simple(_search_url(requests.utils.quote(query)))
for anchor in soup.select("a[href^='/anime/stream/'][href]"):
href = (anchor.get("href") or "").strip()
if not href or "/staffel-" in href or "/episode-" in href:
continue
url = _absolute_url(href)
if url:
_log_parsed_url(url)
title_node = anchor.select_one("h3") or anchor.select_one("strong")
title = (title_node.get_text(" ", strip=True) if title_node else anchor.get_text(" ", strip=True)).strip()
if not title:
continue
if not _matches_query(query, title=title):
continue
key = title.casefold().strip()
if key in seen:
continue
seen.add(key)
results.append(SeriesResult(title=title, description="", url=url))
return results
class AniworldPlugin(BasisPlugin):
name = "Aniworld"
version = "1.0.0"
def __init__(self) -> None:
self._anime_results: Dict[str, SeriesResult] = {}
self._title_url_cache: Dict[str, str] = self._load_title_url_cache()
self._genre_names_cache: Optional[List[str]] = None
self._season_cache: Dict[str, List[SeasonInfo]] = {}
self._season_links_cache: Dict[str, List[SeasonInfo]] = {}
self._episode_label_cache: Dict[Tuple[str, str], Dict[str, EpisodeInfo]] = {}
self._popular_cache: Optional[List[SeriesResult]] = None
self._genre_cache: Optional[Dict[str, List[SeriesResult]]] = None
self._latest_cache: Dict[int, List[LatestEpisode]] = {}
self._latest_hoster_cache: Dict[str, List[str]] = {}
self._requests_available = REQUESTS_AVAILABLE
self._default_preferred_hosters: List[str] = list(DEFAULT_PREFERRED_HOSTERS)
self._preferred_hosters: List[str] = list(self._default_preferred_hosters)
self._hoster_cache: Dict[Tuple[str, str, str], List[str]] = {}
self.is_available = True
self.unavailable_reason: Optional[str] = None
if not self._requests_available: # pragma: no cover - optional dependency
self.is_available = False
self.unavailable_reason = "requests/bs4 fehlen. Installiere 'requests' und 'beautifulsoup4'."
if REQUESTS_IMPORT_ERROR:
print(f"AniworldPlugin Importfehler: {REQUESTS_IMPORT_ERROR}")
def _load_title_url_cache(self) -> Dict[str, str]:
raw = _session_cache_get("title_urls")
if not isinstance(raw, dict):
return {}
result: Dict[str, str] = {}
for key, value in raw.items():
key_text = str(key or "").strip().casefold()
url_text = str(value or "").strip()
if not key_text or not url_text:
continue
result[key_text] = url_text
return result
def _save_title_url_cache(self) -> None:
if not self._title_url_cache:
return
while len(self._title_url_cache) > SESSION_CACHE_MAX_TITLE_URLS:
self._title_url_cache.pop(next(iter(self._title_url_cache)))
_session_cache_set("title_urls", self._title_url_cache)
def _remember_anime_result(
self,
title: str,
url: str,
description: str = "",
*,
persist: bool = True,
) -> bool:
title = (title or "").strip()
url = (url or "").strip()
if not title:
return False
changed = False
current = self._anime_results.get(title)
if current is None or (url and current.url != url) or (description and current.description != description):
self._anime_results[title] = SeriesResult(title=title, description=description, url=url)
changed = True
if url:
key = title.casefold()
if self._title_url_cache.get(key) != url:
self._title_url_cache[key] = url
changed = True
if changed and persist:
self._save_title_url_cache()
return changed
@staticmethod
def _season_links_cache_name(series_url: str) -> str:
digest = hashlib.sha1((series_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_links.{digest}"
@staticmethod
def _season_episodes_cache_name(season_url: str) -> str:
digest = hashlib.sha1((season_url or "").encode("utf-8")).hexdigest()[:20]
return f"season_episodes.{digest}"
def _load_session_season_links(self, series_url: str) -> Optional[List[SeasonInfo]]:
raw = _session_cache_get(self._season_links_cache_name(series_url))
if not isinstance(raw, list):
return None
seasons: List[SeasonInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
url = str(item.get("url") or "").strip()
if number <= 0 or not url:
continue
seasons.append(SeasonInfo(number=number, url=url, episodes=[]))
if not seasons:
return None
seasons.sort(key=lambda s: s.number)
return seasons
def _save_session_season_links(self, series_url: str, seasons: List[SeasonInfo]) -> None:
payload = [{"number": int(season.number), "url": season.url} for season in seasons if season.url]
if payload:
_session_cache_set(self._season_links_cache_name(series_url), payload)
def _load_session_season_episodes(self, season_url: str) -> Optional[List[EpisodeInfo]]:
raw = _session_cache_get(self._season_episodes_cache_name(season_url))
if not isinstance(raw, list):
return None
episodes: List[EpisodeInfo] = []
for item in raw:
if not isinstance(item, dict):
continue
try:
number = int(item.get("number"))
except Exception:
continue
title = str(item.get("title") or "").strip()
original_title = str(item.get("original_title") or "").strip()
url = str(item.get("url") or "").strip()
if number <= 0:
continue
episodes.append(
EpisodeInfo(
number=number,
title=title or f"Episode {number}",
original_title=original_title,
url=url,
)
)
if not episodes:
return None
episodes.sort(key=lambda item: item.number)
return episodes
def _save_session_season_episodes(self, season_url: str, episodes: List[EpisodeInfo]) -> None:
payload = []
for item in episodes:
payload.append(
{
"number": int(item.number),
"title": item.title,
"original_title": item.original_title,
"url": item.url,
}
)
if payload:
_session_cache_set(self._season_episodes_cache_name(season_url), payload)
def capabilities(self) -> set[str]:
return {"popular_series", "genres", "latest_episodes"}
def _find_series_by_title(self, title: str) -> Optional[SeriesResult]:
title = (title or "").strip()
if not title:
return None
direct = self._anime_results.get(title)
if direct:
return direct
wanted = title.casefold().strip()
cached_url = self._title_url_cache.get(wanted, "")
if cached_url:
result = SeriesResult(title=title, description="", url=cached_url)
self._anime_results[title] = result
return result
for candidate in self._anime_results.values():
if candidate.title and candidate.title.casefold().strip() == wanted:
return candidate
try:
for entry in self._ensure_popular():
if entry.title and entry.title.casefold().strip() == wanted:
self._remember_anime_result(entry.title, entry.url, entry.description)
return entry
except Exception:
pass
try:
for entries in self._ensure_genres().values():
for entry in entries:
if entry.title and entry.title.casefold().strip() == wanted:
self._remember_anime_result(entry.title, entry.url, entry.description)
return entry
except Exception:
pass
try:
for entry in search_animes(title):
if entry.title and entry.title.casefold().strip() == wanted:
self._remember_anime_result(entry.title, entry.url, entry.description)
return entry
except Exception:
pass
return None
def _ensure_popular(self) -> List[SeriesResult]:
if self._popular_cache is not None:
return list(self._popular_cache)
soup = _get_soup_simple(_popular_animes_url())
results: List[SeriesResult] = []
cache_dirty = False
seen: set[str] = set()
for anchor in soup.select("div.seriesListContainer a[href^='/anime/stream/']"):
href = (anchor.get("href") or "").strip()
if not href or "/staffel-" in href or "/episode-" in href:
continue
url = _absolute_url(href)
if url:
_log_parsed_url(url)
title_node = anchor.select_one("h3")
title = (title_node.get_text(" ", strip=True) if title_node else "").strip()
if not title:
continue
description = ""
desc_node = anchor.select_one("small")
if desc_node:
description = desc_node.get_text(" ", strip=True).strip()
key = title.casefold().strip()
if key in seen:
continue
seen.add(key)
results.append(SeriesResult(title=title, description=description, url=url))
cache_dirty = self._remember_anime_result(title, url, description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
self._popular_cache = list(results)
return list(results)
def popular_series(self) -> List[str]:
if not self._requests_available:
return []
entries = self._ensure_popular()
cache_dirty = False
for entry in entries:
cache_dirty = self._remember_anime_result(entry.title, entry.url, entry.description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
return [entry.title for entry in entries if entry.title]
def latest_episodes(self, page: int = 1) -> List[LatestEpisode]:
if not self._requests_available:
return []
try:
page = int(page or 1)
except Exception:
page = 1
page = max(1, page)
cached = self._latest_cache.get(page)
if cached is not None:
return list(cached)
url = _latest_episodes_url()
if page > 1:
url = f"{url}?page={page}"
soup = _get_soup_simple(url)
episodes = _extract_latest_episodes(soup)
self._latest_cache[page] = list(episodes)
return list(episodes)
def _ensure_genres(self) -> Dict[str, List[SeriesResult]]:
if self._genre_cache is not None:
return {key: list(value) for key, value in self._genre_cache.items()}
soup = _get_soup_simple(_genres_url())
results: Dict[str, List[SeriesResult]] = {}
cache_dirty = False
genre_blocks = soup.select("#seriesContainer div.genre")
if not genre_blocks:
genre_blocks = soup.select("div.genre")
for genre_block in genre_blocks:
name_node = genre_block.select_one(".seriesGenreList h3")
genre_name = (name_node.get_text(" ", strip=True) if name_node else "").strip()
if not genre_name:
continue
entries: List[SeriesResult] = []
seen: set[str] = set()
for anchor in genre_block.select("ul li a[href]"):
href = (anchor.get("href") or "").strip()
if not href or "/staffel-" in href or "/episode-" in href:
continue
url = _absolute_url(href)
if url:
_log_parsed_url(url)
title = (anchor.get_text(" ", strip=True) or "").strip()
if not title:
continue
key = title.casefold().strip()
if key in seen:
continue
seen.add(key)
entries.append(SeriesResult(title=title, description="", url=url))
cache_dirty = self._remember_anime_result(title, url, persist=False) or cache_dirty
if entries:
results[genre_name] = entries
if cache_dirty:
self._save_title_url_cache()
self._genre_cache = {key: list(value) for key, value in results.items()}
self._genre_names_cache = sorted(self._genre_cache.keys(), key=str.casefold)
_session_cache_set("genres", self._genre_names_cache)
# Für spätere Auflösung (Seasons/Episoden) die Titel->URL Zuordnung auffüllen.
for entries in results.values():
for entry in entries:
if not entry.title:
continue
if entry.title not in self._anime_results:
self._anime_results[entry.title] = entry
return {key: list(value) for key, value in results.items()}
def _ensure_genre_names(self) -> List[str]:
if self._genre_names_cache is not None:
return list(self._genre_names_cache)
cached = _session_cache_get("genres")
if isinstance(cached, list):
names = [str(value).strip() for value in cached if str(value).strip()]
if names:
self._genre_names_cache = sorted(set(names), key=str.casefold)
return list(self._genre_names_cache)
try:
body = _get_html_simple(_genres_url())
names = _extract_genre_names_from_html(body)
except Exception:
names = []
if not names:
mapping = self._ensure_genres()
names = list(mapping.keys())
self._genre_names_cache = sorted({name for name in names if name}, key=str.casefold)
_session_cache_set("genres", self._genre_names_cache)
return list(self._genre_names_cache)
def genres(self) -> List[str]:
if not self._requests_available:
return []
return self._ensure_genre_names()
def titles_for_genre(self, genre: str) -> List[str]:
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
mapping = self._ensure_genres()
entries = mapping.get(genre)
if entries is None:
wanted = genre.casefold()
for key, value in mapping.items():
if key.casefold() == wanted:
entries = value
break
if not entries:
return []
# Zusätzlich sicherstellen, dass die Titel im Cache sind.
cache_dirty = False
for entry in entries:
cache_dirty = self._remember_anime_result(entry.title, entry.url, entry.description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
return [entry.title for entry in entries if entry.title]
def _season_label(self, number: int) -> str:
return f"Staffel {number}"
def _parse_season_number(self, season_label: str) -> Optional[int]:
match = re.search(DIGITS, season_label or "")
return int(match.group(1)) if match else None
def _episode_label(self, info: EpisodeInfo) -> str:
title = (info.title or "").strip()
if title:
return f"Episode {info.number} - {title}"
return f"Episode {info.number}"
def _cache_episode_labels(self, title: str, season_label: str, season_info: SeasonInfo) -> None:
cache_key = (title, season_label)
self._episode_label_cache[cache_key] = {self._episode_label(info): info for info in season_info.episodes}
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
series_url = (series_url or "").strip()
if not title or not series_url:
return
self._remember_anime_result(title, series_url)
def series_url_for_title(self, title: str) -> str:
title = (title or "").strip()
if not title:
return ""
direct = self._anime_results.get(title)
if direct and direct.url:
return direct.url
wanted = title.casefold().strip()
cached_url = self._title_url_cache.get(wanted, "")
if cached_url:
return cached_url
for candidate in self._anime_results.values():
if candidate.title and candidate.title.casefold().strip() == wanted and candidate.url:
return candidate.url
return ""
def _ensure_season_links(self, title: str) -> List[SeasonInfo]:
cached = self._season_links_cache.get(title)
if cached is not None:
return list(cached)
anime = self._find_series_by_title(title)
if not anime:
return []
session_links = self._load_session_season_links(anime.url)
if session_links:
self._season_links_cache[title] = list(session_links)
return list(session_links)
seasons = scrape_anime_detail(anime.url, load_episodes=False)
self._season_links_cache[title] = list(seasons)
self._save_session_season_links(anime.url, seasons)
return list(seasons)
def _ensure_season_episodes(self, title: str, season_number: int) -> Optional[SeasonInfo]:
seasons = self._season_cache.get(title) or []
for season in seasons:
if season.number == season_number and season.episodes:
return season
links = self._ensure_season_links(title)
target = next((season for season in links if season.number == season_number), None)
if not target:
return None
cached_episodes = self._load_session_season_episodes(target.url)
if cached_episodes:
season_info = SeasonInfo(number=target.number, url=target.url, episodes=list(cached_episodes))
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
return season_info
season_soup = _get_soup(target.url, session=get_requests_session("aniworld", headers=HEADERS))
season_info = SeasonInfo(number=target.number, url=target.url, episodes=_extract_episodes(season_soup))
updated = [season for season in seasons if season.number != season_number]
updated.append(season_info)
updated.sort(key=lambda item: item.number)
self._season_cache[title] = updated
self._save_session_season_episodes(target.url, season_info.episodes)
return season_info
def _lookup_episode(self, title: str, season_label: str, episode_label: str) -> Optional[EpisodeInfo]:
cache_key = (title, season_label)
cached = self._episode_label_cache.get(cache_key)
if cached:
return cached.get(episode_label)
number = self._parse_season_number(season_label)
if number is None:
return None
season_info = self._ensure_season_episodes(title, number)
if season_info:
self._cache_episode_labels(title, season_label, season_info)
return self._episode_label_cache.get(cache_key, {}).get(episode_label)
return None
async def search_titles(self, query: str) -> List[str]:
query = (query or "").strip()
if not query:
self._anime_results.clear()
self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear()
self._popular_cache = None
return []
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 nicht suchen.")
try:
results = search_animes(query)
except Exception as exc: # pragma: no cover
self._anime_results.clear()
self._season_cache.clear()
self._episode_label_cache.clear()
raise RuntimeError(f"AniWorld-Suche fehlgeschlagen: {exc}") from exc
self._anime_results = {}
cache_dirty = False
for result in results:
cache_dirty = self._remember_anime_result(result.title, result.url, result.description, persist=False) or cache_dirty
if cache_dirty:
self._save_title_url_cache()
self._season_cache.clear()
self._season_links_cache.clear()
self._episode_label_cache.clear()
return [result.title for result in results]
def _ensure_seasons(self, title: str) -> List[SeasonInfo]:
if title in self._season_cache:
return self._season_cache[title]
seasons = self._ensure_season_links(title)
self._season_cache[title] = list(seasons)
return list(seasons)
def seasons_for(self, title: str) -> List[str]:
seasons = self._ensure_seasons(title)
return [self._season_label(season.number) for season in seasons]
def episodes_for(self, title: str, season: str) -> List[str]:
number = self._parse_season_number(season)
if number is None:
return []
season_info = self._ensure_season_episodes(title, number)
if season_info:
labels = [self._episode_label(info) for info in season_info.episodes]
self._cache_episode_labels(title, season, season_info)
return labels
return []
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Stream-Links liefern.")
episode_info = self._lookup_episode(title, season, episode)
if not episode_info:
return None
link = fetch_episode_stream_link(episode_info.url, preferred_hosters=self._preferred_hosters)
if link:
_log_url(link, kind="FOUND")
return link
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Hoster laden.")
cache_key = (title, season, episode)
cached = self._hoster_cache.get(cache_key)
if cached is not None:
return list(cached)
episode_info = self._lookup_episode(title, season, episode)
if not episode_info:
return []
names = fetch_episode_hoster_names(episode_info.url)
self._hoster_cache[cache_key] = list(names)
return list(names)
def available_hosters_for_url(self, episode_url: str) -> List[str]:
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Hoster laden.")
normalized = _absolute_url(episode_url)
cached = self._latest_hoster_cache.get(normalized)
if cached is not None:
return list(cached)
names = fetch_episode_hoster_names(normalized)
self._latest_hoster_cache[normalized] = list(names)
return list(names)
def stream_link_for_url(self, episode_url: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Stream-Links liefern.")
normalized = _absolute_url(episode_url)
link = fetch_episode_stream_link(normalized, preferred_hosters=self._preferred_hosters)
if link:
_log_url(link, kind="FOUND")
return link
def resolve_stream_link(self, link: str) -> Optional[str]:
if not self._requests_available:
raise RuntimeError("AniworldPlugin kann ohne requests/bs4 keine Stream-Links aufloesen.")
resolved = resolve_redirect(link)
if not resolved:
return None
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved_by_resolveurl = resolve_with_resolveurl(resolved)
if resolved_by_resolveurl:
_log_url("ResolveURL", kind="HOSTER_RESOLVER")
_log_url(resolved_by_resolveurl, kind="MEDIA")
return resolved_by_resolveurl
_log_url(resolved, kind="FINAL")
return resolved
def set_preferred_hosters(self, hosters: List[str]) -> None:
normalized = [hoster.strip().lower() for hoster in hosters if hoster.strip()]
if normalized:
self._preferred_hosters = normalized
def reset_preferred_hosters(self) -> None:
self._preferred_hosters = list(self._default_preferred_hosters)
Plugin = AniworldPlugin