Compare commits

..

45 Commits

Author SHA1 Message Date
caa4a4a0e2 dev: Max. Eintraege pro Seite Setting pro Plugin 2026-03-10 22:50:43 +01:00
5ccda44623 dev: Changelog-Dialog nur anzeigen wenn Eintrag vorhanden 2026-03-10 22:07:51 +01:00
1dbc93afd1 dev: Versionsfilter fuer 4-teilige Versionsnummern korrigiert 2026-03-10 16:28:15 +01:00
649929ca5d dev: bump to 0.1.76.0-dev – aeltere Versionen im Update-Dialog, Release-Branch-Zuordnung, README ueberarbeitet 2026-03-10 14:28:41 +01:00
5564851d35 dev: bump to 0.1.74-dev – BurningSeries entfernt, Paging-Fix Neuste Titel 2026-03-10 10:41:37 +01:00
6e7b4c3d39 dev: bump to 0.1.72-dev – Autoplay-Setting, Moflix Hoster-Dialog, Update-Hinweis im Hauptmenue 2026-03-06 21:05:53 +01:00
957a5a1aea dev: bump to 0.1.72-dev – HDFilme Neufassung (BeautifulSoup, korrekte Selektoren, Genres, Metadaten) 2026-03-04 23:07:44 +01:00
58da715723 dev: bump to 0.1.71-dev – neue Plugins (Moflix, KKiste, HDFilme, Netzkino), SerienStream A-Z, VidHide-Fix 2026-03-04 22:29:49 +01:00
ff30548811 dev: bump to 0.1.71-dev – Trakt History direkt abspielen, Metadaten + Plugin-Bugfixes
- Trakt History: Episoden starten direkt (kein Staffel-Dialog mehr)
- Trakt History: Episodentitel, Plot und Artwork bereits in der Übersicht
- TraktItem um episode_title, episode_overview, episode_thumb, show_poster, show_fanart erweitert
- get_history() nutzt jetzt ?extended=full,images
- Slash-Commands /check und /deploy angelegt
- build_install_addon.sh deployt jetzt auch nach ~/.kodi/addons/
- filmpalast_plugin: return-Tuple-Bug gefixt (return "", "", "")
- dokustreams_plugin: Regex-Escaping für clean_name() korrigiert
- aniworld_plugin: raise_for_status() in resolve_redirect() ergänzt
- serienstream_plugin: Toter Code und unnötigen Regex-Backslash entfernt
2026-03-01 22:56:51 +01:00
95e14583e0 dev: bump to 0.1.71-dev – Episodentitel und TMDB API-Key-Fix 2026-03-01 19:45:45 +01:00
3c0891b638 dev: bump to 0.1.71-dev – vollständiges Trakt-Scrobbling mit stop-Monitor 2026-03-01 19:17:58 +01:00
7243c5353b dev: fix SyntaxError global declarations vor erster Verwendung 2026-03-01 18:53:59 +01:00
d12853dd26 dev: bump to 0.1.70-dev – gruppierte Suche und SyntaxError-Fix 2026-03-01 18:51:03 +01:00
e28e2c9791 dev: fix SyntaxError global declaration in _trakt_find_in_plugins
global _TRAKT_PLUGIN_MATCH_CACHE_TS muss vor der ersten Verwendung stehen
2026-03-01 18:50:23 +01:00
1f0e627721 dev: gruppierte Suchergebnisse mit Quellenauswahl (Issue #2)
- _show_search_results() gruppiert Treffer über alle Plugins nach Titel
- Titel in einem Plugin: direkt zur Staffel-Ansicht (kein Plugin-Suffix)
- Titel in mehreren Plugins: Zwischenstufe 'Quelle wählen'
- Neue Funktion _show_choose_source() und Route 'choose_source'
2026-03-01 18:49:18 +01:00
7b60b00c8b dev: umfangreiches Refactoring, Trakt-Integration und Code-Review-Fixes (0.1.69-dev)
Core & Architektur:
- Neues Verzeichnis addon/core/ mit router.py, trakt.py, metadata.py,
  gui.py, playstate.py, plugin_manager.py, updater.py
- Tests-Verzeichnis hinzugefügt (24 Tests, pytest + Coverage)

Trakt-Integration:
- OAuth Device Flow, Scrobbling, Watchlist, History, Calendar
- Upcoming Episodes, Weiterschauen (Continue Watching)
- Watched-Status in Episodenlisten
- _trakt_find_in_plugins() mit 5-Min-Cache

Serienstream-Suche:
- API-Ergebnisse werden immer mit Katalog-Cache ergänzt (serverseitiges 10-Treffer-Limit)
- Katalog-Cache wird beim Addon-Start im Daemon-Thread vorgewärmt
- Notification nach Cache-Load via xbmc.executebuiltin() (thread-sicher)

Bugfixes (Code-Review):
- Race Condition auf _TRAKT_WATCHED_CACHE: _TRAKT_WATCHED_CACHE_LOCK hinzugefügt
- GUI-Dialog aus Daemon-Thread: xbmcgui -> xbmc.executebuiltin()
- ValueError in Trakt-Watchlist-Routen abgesichert
- Token expires_at==0 Check korrigiert
- get_setting_bool() Kontrollfluss in gui.py bereinigt
- topstreamfilm_plugin: try-finally um xbmcvfs.File.close()

Cleanup:
- default.py.bak und refactor_router.py entfernt
- .gitignore: /tests/ Eintrag entfernt
- Type-Hints vereinheitlicht (Dict/List/Tuple -> dict/list/tuple)
2026-03-01 18:39:05 +01:00
73f07d20b4 dev: bump to 0.1.66 and harden resolveurl + serienstream 2026-02-25 16:35:16 +01:00
74d15cb25e dev: roll back resolver routing changes and publish 0.1.65-dev 2026-02-24 21:36:04 +01:00
482e0b0cc6 dev: revert resolveurl dependency path injection 2026-02-24 21:00:21 +01:00
39ec975afa dev: add kodi-six and dependency paths for resolveurl import 2026-02-24 20:53:04 +01:00
72aa5de166 dev: add direct resolveurl fallback and resolver status logs 2026-02-24 20:50:55 +01:00
ce4b97e19f dev: avoid VOE-first fallback and harden unresolved hoster handling 2026-02-24 20:46:49 +01:00
16e0c77162 dev: import resolveurl from addon path when dependency is soft 2026-02-24 20:41:08 +01:00
6bdd4659bb dev: block unresolved embed links and retry resolver install 2026-02-24 20:36:09 +01:00
f438ff88fa dev: add explicit dev update channel and repo addon 1.0.1 2026-02-24 20:17:10 +01:00
e5bc67eef8 dev: clean up genre labels from filter keys 2026-02-24 18:51:36 +01:00
76b04ddaf2 dev: normalize filter.genre_* labels in genre parsing 2026-02-24 18:50:31 +01:00
16e4b5f261 dev: harden resolver bootstrap and simplify update settings 2026-02-24 16:18:44 +01:00
99b67a24f8 dev: show full series info already in title selection 2026-02-24 14:04:47 +01:00
45d447cdb3 dev: load full metadata for currently opened genre page 2026-02-24 14:00:19 +01:00
b9687ea127 dev: split changelog files and use dev changelog for -dev versions 2026-02-24 13:56:40 +01:00
f1f9d8f5d8 dev: include plot text in Serienstream genre list entries 2026-02-24 13:54:33 +01:00
358cfb1967 dev: switch Serienstream genres to strict page-on-demand flow 2026-02-24 13:33:35 +01:00
0d10219ccb dev: add on-demand Serienstream genre paging and minimal list parser 2026-02-24 13:32:12 +01:00
aab7613304 nightly: bump 0.1.61 and fix install/cancel selection flow 2026-02-23 20:59:15 +01:00
896398721c updates: fix install dialog labels and use InstallAddon flow 2026-02-23 20:55:19 +01:00
d1b22da9cd updates: read installed version from addon.xml on disk 2026-02-23 20:52:55 +01:00
305a58c8bd updates: filter versions by channel semver pattern 2026-02-23 20:50:06 +01:00
75a7df8361 updates: apply channel now installs latest version from selected channel 2026-02-23 20:47:18 +01:00
d876d5b84c updates: add version picker with changelog and install/cancel flow 2026-02-23 20:44:33 +01:00
59728875e9 updates: show installed/available versions and apply channel explicitly 2026-02-23 20:42:09 +01:00
db5748e012 docs: add release flow for nightly and main 2026-02-23 20:36:43 +01:00
ef531ea0aa nightly: bump to 0.1.60 and finalize menu, resolver, settings cleanup 2026-02-23 20:21:44 +01:00
7ba24532ad Bump nightly to 0.1.59-nightly and default update channel to nightly 2026-02-23 19:54:40 +01:00
3f799aa170 Unify menu labels, centralize hoster URL normalization, and add auto-update toggle 2026-02-23 19:54:17 +01:00
61 changed files with 9738 additions and 1061 deletions

3
.gitignore vendored
View File

@@ -7,8 +7,7 @@
# Build outputs
/dist/
# Local tests (not committed)
/tests/
# Lokale Test-Artefakte
/TESTING/
/.pytest_cache/
/pytest.ini

View File

@@ -4,4 +4,4 @@
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
}

199
CHANGELOG-DEV.md Normal file
View File

@@ -0,0 +1,199 @@
## 0.1.77.0-dev - 2026-03-10
- dev: Changelog-Dialog nur anzeigen wenn Eintrag vorhanden
## 0.1.76.5-dev - 2026-03-10
- dev: Versionsfilter fuer 4-teilige Versionsnummern korrigiert
## 0.1.76.0-dev - 2026-03-10
- dev: bump to 0.1.76.0-dev aeltere Versionen im Update-Dialog, Release-Branch-Zuordnung, README ueberarbeitet
## 0.1.74-dev - 2026-03-10
- dev: bump to 0.1.74-dev BurningSeries entfernt, Paging-Fix Neuste Titel
## 0.1.73-dev - 2026-03-07
- dev: bump to 0.1.73-dev Autoplay-Setting, Moflix Hoster-Dialog, Update-Hinweis im Hauptmenue
# Changelog (Dev)
## 0.1.71-dev - 2026-03-01
- Trakt Scrobble vollständig: scrobble/stop wird nach Wiedergabe-Ende mit berechnetem Fortschritt gesendet.
- Neue Funktion `_trakt_scrobble_stop_async()` und `_trakt_monitor_playback()`.
- Monitor blockiert den Plugin-Prozess bis Wiedergabe endet → Fortschritt wird korrekt berechnet.
- Damit wird Trakt "als gesehen" erst ab ≥ 80% Fortschritt markiert.
## 0.1.70-dev - 2026-03-01
- Suchergebnisse werden über alle Plugins hinweg nach Titel gruppiert.
- Titel in einem Plugin: direkt zur Staffel-Ansicht (kein Plugin-Suffix).
- Titel in mehreren Plugins: Zwischenstufe "Quelle wählen" mit Plugin-Liste.
- Neue Route `choose_source` und Funktion `_show_choose_source()`.
- Fix: SyntaxError `global _TRAKT_PLUGIN_MATCH_CACHE_TS` war nach erster Verwendung deklariert.
## 0.1.69-dev - 2026-03-01
### Trakt: Neue Features
- **Weiterschauen:** Neuer Hauptmenüpunkt zeigt die nächste ungesehene Folge je Serie
basierend auf der Trakt-History. Auto-Matching sucht den Titel automatisch in allen
aktiven Plugins und verlinkt direkt zur richtigen Staffel.
- **Trakt Upcoming:** Neuer Hauptmenüpunkt zeigt anstehende Episoden der Watchlist-Serien
(Trakt-Kalender, 14 Tage voraus) mit Ausstrahlungsdatum und TMDB-Poster.
Auto-Matching wie bei „Weiterschauen".
- **Watched-Status in Episodenlisten:** Bereits bei Trakt als gesehen markierte Episoden
werden in Staffelansichten mit Häkchen (Kodi `playcount=1`) markiert.
Ergebnis wird 5 Minuten gecacht um API-Aufrufe zu minimieren.
- **`core/trakt.py`:** Neuer `TraktCalendarItem`-Dataclass und `get_calendar()`-Methode
(Trakt Calendar API: `/calendars/my/shows/{start}/{days}`).
### Python 3.8 Kompatibilität (Windows/Kodi)
- **`from __future__ import annotations`** in allen Modulen ergänzt, die noch kein
deferred-evaluation hatten (`core/router.py`, `core/metadata.py`, `core/playstate.py`,
`core/gui.py`, `regex_patterns.py`). Behebt `TypeError: 'type' object is not subscriptable`
auf Kodi-Installationen mit Python 3.8 (Windows).
### Bugfixes
- **`dokustreams_plugin.py`:** Regex `r"(\\d+)"``r"(\d+)"` Seitennavigation hat
nie Seitenzahlen gefunden (falsch-escaped in Raw-String).
- **`serienstream_plugin.py`:** Dedup-Key verwendete `\\t` (Backslash+t) statt echtem
Tab-Zeichen inkonsistent mit `aniworld_plugin.py`.
- **Menübereinigung:** Doppelter Menüpunkt „Neue Titel" (`new_plugin_titles`) entfernt
identisch mit „Neuste Titel" (`latest_titles`).
---
## 0.1.67-dev - 2026-02-27
### Stabilitäts- und Sicherheits-Fixes
- **Sicherheit (updater.py):** ZIP-Extraktion prüft jetzt jeden Eintrag auf Path-Traversal.
Bösartige Pfade (z. B. `../../`) werden abgelehnt und der Update-Vorgang abgebrochen
(war: still ignoriert mit `continue`).
- **Silent-Failure-Fix (metadata.py):** Neues `_initialized`-Flag und `_require_init()`-Guard
wenn `init()` nicht aufgerufen wurde, erscheint jetzt eine Warnung im Log statt lautlosem No-Op.
- **Thread-Safety (default.py):** Neue Locks `_PLUGIN_CACHE_LOCK` und `_GENRE_TITLES_CACHE_LOCK`
schützen alle Cache-Zugriffe auf Plugin-Instanzen und Genre-Titel-Caches.
- **Memory-Leak-Fix (default.py):** Alle internen Caches haben jetzt ein Größenlimit
(`_CACHE_MAXSIZE = 500`) mit LRU-artigem Eviction-Mechanismus.
- **Code-Qualität (default.py):** ~300 Zeilen Duplicate-Code durch `_show_paged_title_list()`
Hilfsfunktion ersetzt alle paginierten Titellisten (Genres, Kategorien, A-Z) nutzen jetzt
dieselbe Logik.
- **Syntax-Fix (default.py):** Fremd-Text in Zeile 3517 entfernt, der einen latenten Syntax-
Fehler verursachte.
### Neues Plugin-Interface
Neue optionale Methoden in `BasisPlugin` (plugin_interface.py):
| Methode | Beschreibung |
|---------|-------------|
| `latest_titles(page)` | Neuerscheinungen / neu hinzugefügte Titel |
| `years_available()` | Verfügbare Erscheinungsjahre für Filter |
| `titles_for_year(year, page)` | Titel nach Jahr gefiltert |
| `countries_available()` | Verfügbare Länder für Filter |
| `titles_for_country(country, page)` | Titel nach Herkunftsland gefiltert |
| `collections()` | Filmreihen / Sammlungen |
| `titles_for_collection(collection, page)` | Titel einer Sammlung |
| `tags()` | Schlagworte / Tags |
| `titles_for_tag(tag, page)` | Titel nach Schlagwort |
| `random_title()` | Zufälliger Titel |
Neue Capability-Strings: `latest_titles`, `year_filter`, `country_filter`, `collections`, `tags`, `random`
### Neue Menüeinträge in default.py
Plugins die die neuen Capabilities melden, erhalten automatisch folgende Menüpunkte:
- **"Neue Titel"** für Plugins mit `latest_titles`
- **"Nach Jahr"** für Plugins mit `year_filter`
- **"Nach Land"** für Plugins mit `country_filter`
- **"Sammlungen"** für Plugins mit `collections`
- **"Schlagworte"** für Plugins mit `tags`
- **"Zufälliger Titel"** für Plugins mit `random`
### Plugin-Erweiterungen
**Filmpalast** (`filmpalast_plugin.py`):
- `popular_series()` scrapt Top-Filme von `/movies/top`
- `latest_titles(page)` scrapt Neuerscheinungen von `/movies/new` mit Pagination
- IMDb-Rating in `metadata_for()` integriert (`info_labels["rating"]`)
- Neue Capabilities: `popular_series`, `latest_titles`
**Doku-Streams** (`dokustreams_plugin.py`):
- `tags()` scrapt alle Schlagworte von der Startseite
- `titles_for_tag(tag, page)` Titelliste pro Schlagwort mit Pagination
- `random_title()` folgt dem Redirect von `/zufaellige-doku/`
- `resolve_stream_link(link)` ResolveURL + HTTP-Redirect-Fallback
- Neue Capabilities: `tags`, `random`
**AniWorld** (`aniworld_plugin.py`):
- `titles_for_genre_page(genre, page)` paginierte Genre-Titellisten via `/genre/[slug]?page=[n]`
- `genre_page_count(genre)` letzte Seitennummer aus Pagination extrahiert
- `latest_titles(page)` neue Anime-Releases via `/animekalender`
- Neue Capability: `latest_titles`
**SerienStream** (`serienstream_plugin.py`):
- `latest_titles(page)` neue Serien via JSON-Kalender-API (`/api/calendar`) mit wochenweiser Rückwärts-Paginierung
- `genre_page_count(genre)` Gesamtanzahl der Genre-Seiten aus Pagination-Links extrahiert
- `alpha_index()` gibt A-Z-Buchstaben + `0-9` zurück
- `titles_for_alpha_page(letter, page)` Serien alphabetisch abrufen via `/serien/alle?buchstabe={letter}`
- Neue Capabilities: `latest_titles`, `alpha`
**TopStreamFilm** (`topstreamfilm_plugin.py`):
- `years_available()` statische Liste vom aktuellen Jahr bis 1980
- `titles_for_year(year, page)` Titel nach Erscheinungsjahr via `/xfsearch/{year}/page/{n}/`
- `latest_titles(page)` neue Filme via `/neueste-filme/page/{n}/`
- Neue Capabilities: `year_filter`, `latest_titles`
**Einschalten** (`einschalten_plugin.py`):
- `popular_series()` Top-50 Filme sortiert nach `voteAverage` (absteigend)
- `latest_titles(page)` neue Filme (Alias zu `new_titles_page(page)`)
- Neue Capabilities: `popular_series`, `latest_titles`
- Hinweis: Ratings (`voteAverage`, `voteCount`) waren bereits in `metadata_for()` enthalten
---
## 0.1.66-dev - 2026-02-25
- Serienstream HTTP-Fetches robuster gemacht: Retry bei kurzzeitigen Verbindungsabbruechen inkl. Session-Reset.
- ResolveURL-Import im Kodi-Addon gehaertet: Fallback ueber Addon-`lib`-Pfade (`resolveurl`, `kodi-six`, `six`).
- Resolver-Debug erweitert: Input/Output im Kodi-Log fuer jeden ResolveURL-Aufruf sichtbar.
- Playback-Guard fuer unaufgeloeste Hoster-Links (z. B. `voe.sx/e/...`) bleibt aktiv und liefert klare Fehlermeldungen.
- `script.module.resolveurl` als Addon-Abhaengigkeit in `addon.xml` eingetragen.
## 0.1.65-dev - 2026-02-24
- Resolver-Diagnosepfad wieder vereinfacht (Fallback/Embed-Block aus dem Router entfernt).
- Serienstream und AniWorld wieder mit VOE als bevorzugtem Hoster in der Default-Prioritaet.
## 0.1.64-dev - 2026-02-24
- Update-Kanaele in den Settings erweitert: `Dev` ist jetzt ein eigener Kanal.
- Neue Setting-URL `update_repo_url_dev` fuer direkte Dev-Repo-Auswahl.
- Repository-Addon auf `1.0.1` erhoeht.
- `repository.viewit` enthaelt jetzt zusaetzlich den `smrzips`-Feed, damit ResolveURL-Updates direkt verfuegbar sind.
## 0.1.63-dev - 2026-02-24
- ResolveURL ist jetzt eine weiche Abhaengigkeit: ViewIt installiert auch ohne vorinstalliertes ResolveURL.
- Neuer Settings-Action: `ResolveURL installieren/reparieren`.
- Optionales Auto-Bootstrap: ResolveURL kann beim Start automatisch nachinstalliert werden.
- Wiedergabe versucht bei fehlendem ResolveURL einmalig eine stille Nachinstallation und loest dann erneut auf.
- Update-Settings aufgeraeumt: Fokus auf installierte Version, Kanalstatus und verfuegbare Version im gewaehlten Kanal.
- Repo-Validierung als Script hinzugefuegt (`scripts/verify_repo_artifacts.py`) und in den lokalen Repo-Build eingebunden.
## 0.1.62-dev - 2026-02-24
- Neuer Dev-Stand fuer Genre-Performance (Serienstream).
- Genre-Listen laden strikt nur die angeforderte Seite (on-demand, max. 20 Titel).
- Weitere Seiten werden erst bei `Naechste Seite` geladen.
- Listen-Parser reduziert auf Titel, Serien-URL und Cover.
- Plot wird aus den Karten mit uebernommen und in der Liste angezeigt, falls vorhanden.
- Metadaten werden fuer die jeweils geoeffnete Seite vollstaendig geladen und angezeigt.
- Serien-Infos (inkl. Plot/Art) sind bereits in der Titelauswahl sichtbar, nicht erst in der Staffelansicht.

29
CHANGELOG-NIGHTLY.md Normal file
View File

@@ -0,0 +1,29 @@
# Changelog (Nightly)
## 0.1.61-nightly - 2026-02-23
- Update-Dialog: feste Auswahl mit `Installieren` / `Abbrechen` (kein vertauschter Yes/No-Dialog mehr).
- Versionen im Update-Dialog nach Kanal gefiltert:
- Main: nur `x.y.z`
- Nightly: nur `x.y.z-nightly`
- Installierte Version wird direkt aus `addon.xml` gelesen.
- Beim Kanalwechsel wird direkt die neueste Version aus dem gewaehlten Kanal installiert.
## 0.1.59-nightly - 2026-02-23
- Enthaelt alle Aenderungen aus `0.1.58`.
- Update-Kanal standardmaessig auf `Nightly`.
- Nightly-Repo-URL als Standard gesetzt.
- Settings-Menue neu sortiert:
- Quellen
- Metadaten
- TMDB Erweitert
- Updates
- Debug Global
- Debug Quellen
- Seitengroesse in Listen auf 20 gesetzt.
- `topstream_genre_max_pages` entfernt.
## Hinweis
- Nightly ist fuer Tests und kann sich kurzfristig aendern.

15
CHANGELOG.md Normal file
View File

@@ -0,0 +1,15 @@
# Changelog (Stable)
> Stabile Releases werden aus dem `dev`-Branch nach `main` übertragen.
> Den vollständigen Entwicklungs-Changelog findet man in [CHANGELOG-DEV.md](CHANGELOG-DEV.md).
## 0.1.58 - 2026-02-23
- Menuebezeichnungen vereinheitlicht (`Haeufig gesehen`, `Neuste Titel`).
- `Neue Titel` und `Neueste Folgen` im Menue zu `Neuste Titel` zusammengelegt.
- Hoster-Header-Anpassung zentral nach `resolve_stream_link` eingebaut.
- Hinweis bei Cloudflare-Block.
- Update-Einstellungen erweitert (Kanal, manueller Check, optionaler Auto-Check).
- Metadaten in AniWorld und Filmpalast
- Topstreamfilm-Suche: fehlender `urlencode`-Import behoben.
- Einige ungenutzte Funktionen entfernt.

117
README.md
View File

@@ -2,51 +2,86 @@
<img src="addon/resources/logo.png" alt="ViewIT Logo" width="220" />
ViewIT ist ein Kodi Addon.
Es durchsucht Provider und startet Streams.
**ViewIT** ist ein Kodi-Addon zum Streamen von Filmen und Serien aus verschiedenen Quellen alles an einem Ort.
## Projektstruktur
- `addon/` Kodi Addon Quellcode
- `scripts/` Build Scripts
- `dist/` Build Ausgaben
- `docs/` Doku
- `tests/` Tests
## Was kann ViewIT?
## Build und Release
- Addon Ordner bauen: `./scripts/build_install_addon.sh`
- Kodi ZIP bauen: `./scripts/build_kodi_zip.sh`
- Version pflegen: `addon/addon.xml`
- Reproduzierbares ZIP: `SOURCE_DATE_EPOCH` optional setzen
- **Filme und Serien** aus mehreren Streaming-Quellen durchsuchen und abspielen
- **Trakt-Anbindung** Watchlist, Weiterschauen, Watch-History und automatisches Scrobbling
- **TMDB-Metadaten** Poster, Beschreibungen, Bewertungen und Fanart automatisch laden
- **Autoplay** Bevorzugten Hoster einstellen und direkt abspielen
- **Automatische Updates** Neue Versionen werden erkannt und koennen direkt installiert werden
## Lokales Kodi Repository
- Repository bauen: `./scripts/build_local_kodi_repo.sh`
- Repository starten: `./scripts/serve_local_kodi_repo.sh`
- Standard URL: `http://127.0.0.1:8080/repo/addons.xml`
- Eigene URL beim Build: `REPO_BASE_URL=http://<host>:<port>/repo ./scripts/build_local_kodi_repo.sh`
## Installation
## Entwicklung
- Router: `addon/default.py`
- Plugins: `addon/plugins/*_plugin.py`
- Settings: `addon/resources/settings.xml`
1. Die neueste ZIP-Datei herunterladen (siehe [Releases](https://gitea.it-drui.de/viewit/ViewIT/releases))
2. In Kodi: **Einstellungen** > **Addons** > **Aus ZIP-Datei installieren**
3. Die heruntergeladene ZIP-Datei auswaehlen
4. ViewIT erscheint unter **Video-Addons**
## TMDB API Key einrichten
- TMDB Account anlegen und API Key (v3) erstellen: `https://www.themoviedb.org/settings/api`
- In Kodi das ViewIT Addon oeffnen: `Einstellungen -> TMDB`
- `TMDB aktivieren` einschalten
- `TMDB API Key` eintragen
- Optional `TMDB Sprache` setzen (z. B. `de-DE`)
- Optional die Anzeige-Optionen aktivieren/deaktivieren:
- `TMDB Beschreibung anzeigen`
- `TMDB Poster und Vorschaubild anzeigen`
- `TMDB Fanart/Backdrop anzeigen`
- `TMDB Bewertung anzeigen`
- `TMDB Stimmen anzeigen`
- `TMDB Besetzung anzeigen`
### Updates
## Tests
- Dev Pakete installieren: `./.venv/bin/pip install -r requirements-dev.txt`
- Tests starten: `./.venv/bin/pytest`
- XML Report: `./.venv/bin/pytest --cov-report=xml`
ViewIT kann sich selbst aktualisieren. In den Addon-Einstellungen unter **Updates** den gewuenschten Kanal waehlen:
## Dokumentation
Siehe `docs/`.
- **Main** Stabile Versionen
- **Nightly** Aktuelle Entwicklungsversionen
- **Dev** Neueste Aenderungen (kann instabil sein)
## Einstellungen
Die wichtigsten Einstellungen im Addon:
- **Quellen** Basis-URLs der einzelnen Streaming-Seiten (falls sich Domains aendern)
- **Metadaten** TMDB aktivieren/deaktivieren, Sprache einstellen
- **Wiedergabe** Autoplay und bevorzugten Hoster festlegen
- **Trakt** Konto verbinden fuer Watchlist und Scrobbling
## Voraussetzungen
- Kodi 19 (Matrix) oder neuer
- **ResolveURL** wird beim ersten Start automatisch installiert (kann auch manuell ueber die Einstellungen nachinstalliert werden)
## Fuer Entwickler
<details>
<summary>Build und Projektstruktur</summary>
### Projektstruktur
```
addon/ Kodi-Addon Quellcode
default.py Hauptdatei und Router
plugins/ Streaming-Plugins (eins pro Quelle)
core/ Kernmodule (Trakt, Metadaten, Updates, ...)
resources/ Einstellungen, Icons, Sprachdateien
scripts/ Build- und Deploy-Skripte
tests/ Automatisierte Tests
dist/ Build-Ausgaben
```
### Addon bauen
```bash
# Addon-Ordner erzeugen
bash scripts/build_install_addon.sh
# ZIP fuer Kodi-Installation erzeugen
bash scripts/build_kodi_zip.sh
```
### Tests ausfuehren
```bash
pip install -r requirements-dev.txt
pytest
```
### Lokales Kodi-Repository
```bash
bash scripts/build_local_kodi_repo.sh
bash scripts/serve_local_kodi_repo.sh
# Erreichbar unter http://127.0.0.1:8080/repo/addons.xml
```
</details>

View File

@@ -1,10 +1,11 @@
<?xml version='1.0' encoding='utf-8'?>
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.58" provider-name="ViewIt">
<addon id="plugin.video.viewit" name="ViewIt" version="0.1.77.5-dev" provider-name="ViewIt">
<requires>
<import addon="xbmc.python" version="3.0.0" />
<import addon="script.module.requests" />
<import addon="script.module.beautifulsoup4" />
<import addon="script.module.resolveurl" />
<import addon="script.module.resolveurl" version="5.1.0" />
<import addon="script.trakt" optional="true" />
</requires>
<extension point="xbmc.python.pluginsource" library="default.py">
<provides>video</provides>

2
addon/core/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from __future__ import annotations
# ViewIT core package

341
addon/core/gui.py Normal file
View File

@@ -0,0 +1,341 @@
from __future__ import annotations
import sys
import re
import contextlib
from urllib.parse import urlencode
from typing import Any, Generator, Optional, Callable
from contextlib import contextmanager
try:
import xbmc
import xbmcaddon
import xbmcgui
import xbmcplugin
except ImportError:
xbmc = None
xbmcaddon = None
xbmcgui = None
xbmcplugin = None
_ADDON_INSTANCE = None
def get_addon():
global _ADDON_INSTANCE
if xbmcaddon is None:
return None
if _ADDON_INSTANCE is None:
_ADDON_INSTANCE = xbmcaddon.Addon()
return _ADDON_INSTANCE
def get_handle() -> int:
return int(sys.argv[1]) if len(sys.argv) > 1 else -1
def get_setting_string(setting_id: str) -> str:
addon = get_addon()
if addon is None:
return ""
getter = getattr(addon, "getSettingString", None)
if callable(getter):
try:
return str(getter(setting_id) or "")
except Exception:
pass
getter = getattr(addon, "getSetting", None)
if callable(getter):
try:
return str(getter(setting_id) or "")
except Exception:
pass
return ""
def get_setting_bool(setting_id: str, *, default: bool = False) -> bool:
addon = get_addon()
if addon is None:
return default
# Schritt 1: Prüfe ob das Setting überhaupt gesetzt ist (leerer Rohwert = default)
raw_getter = getattr(addon, "getSetting", None)
if callable(raw_getter):
try:
raw = str(raw_getter(setting_id) or "").strip()
if not raw:
return default
except Exception:
return default
# Schritt 2: Bevorzuge getSettingBool für korrekte Typ-Konvertierung
getter = getattr(addon, "getSettingBool", None)
if callable(getter):
try:
return bool(getter(setting_id))
except Exception:
pass
# Schritt 3: Fallback Rohwert manuell parsen
if callable(raw_getter):
try:
raw = str(raw_getter(setting_id) or "").strip().lower()
return raw == "true"
except Exception:
pass
return default
def get_setting_int(setting_id: str, *, default: int = 0) -> int:
addon = get_addon()
if addon is None:
return default
getter = getattr(addon, "getSettingInt", None)
if callable(getter):
try:
raw_getter = getattr(addon, "getSetting", None)
if callable(raw_getter):
raw = str(raw_getter(setting_id) or "").strip()
if not raw:
return default
return int(getter(setting_id))
except Exception:
pass
getter = getattr(addon, "getSetting", None)
if callable(getter):
try:
raw = str(getter(setting_id) or "").strip()
return int(raw) if raw else default
except Exception:
pass
return default
def set_setting_string(setting_id: str, value: str) -> None:
addon = get_addon()
if addon is None:
return
setter = getattr(addon, "setSettingString", None)
if callable(setter):
try:
setter(setting_id, str(value))
return
except Exception:
pass
setter = getattr(addon, "setSetting", None)
if callable(setter):
try:
setter(setting_id, str(value))
except Exception:
pass
@contextmanager
def progress_dialog(heading: str, message: str = ""):
"""Zeigt einen Fortschrittsdialog in Kodi und liefert eine Update-Funktion."""
dialog = None
try:
if xbmcgui is not None and hasattr(xbmcgui, "DialogProgress"):
dialog = xbmcgui.DialogProgress()
dialog.create(heading, message)
except Exception:
dialog = None
def _update_fn(percent: int, msg: str = "") -> bool:
if dialog:
try:
dialog.update(percent, msg or message)
return dialog.iscanceled()
except Exception:
pass
return False
try:
yield _update_fn
finally:
if dialog:
try:
dialog.close()
except Exception:
pass
@contextmanager
def busy_dialog(message: str = "Bitte warten...", *, heading: str = "Bitte warten"):
"""Progress-Dialog statt Spinner, mit kurzem Status-Text."""
with progress_dialog(heading, message) as progress:
progress(10, message)
def _update(step_message: str, percent: int | None = None) -> bool:
pct = 50 if percent is None else max(5, min(95, int(percent)))
return progress(pct, step_message or message)
try:
yield _update
finally:
progress(100, "Fertig")
def run_with_progress(heading: str, message: str, loader: Callable[[], Any]) -> Any:
"""Fuehrt eine Ladefunktion mit sichtbarem Fortschrittsdialog aus."""
with progress_dialog(heading, message) as progress:
progress(10, message)
result = loader()
progress(100, "Fertig")
return result
def set_content(handle: int, content: str) -> None:
"""Hint Kodi about the content type so skins can show watched/resume overlays."""
content = (content or "").strip()
if not content:
return
try:
setter = getattr(xbmcplugin, "setContent", None)
if callable(setter):
setter(handle, content)
except Exception:
pass
def add_directory_item(
handle: int,
label: str,
action: str,
params: dict[str, str] | None = None,
*,
is_folder: bool = True,
info_labels: dict[str, Any] | None = None,
art: dict[str, str] | None = None,
cast: Any = None,
base_url: str = "",
) -> None:
"""Fuegt einen Eintrag in die Kodi-Liste ein."""
query: dict[str, str] = {"action": action}
if params:
query.update(params)
url = f"{base_url}?{urlencode(query)}"
item = xbmcgui.ListItem(label=label)
if not is_folder:
try:
item.setProperty("IsPlayable", "true")
except Exception:
pass
apply_video_info(item, info_labels, cast)
if art:
setter = getattr(item, "setArt", None)
if callable(setter):
try:
setter(art)
except Exception:
pass
xbmcplugin.addDirectoryItem(handle=handle, url=url, listitem=item, isFolder=is_folder)
def apply_video_info(item, info_labels: dict[str, Any] | None, cast: Any = None) -> None:
"""Setzt Metadaten via InfoTagVideo (Kodi v20+), mit Fallback."""
if not info_labels and not cast:
return
info_labels = dict(info_labels or {})
get_tag = getattr(item, "getVideoInfoTag", None)
tag = None
if callable(get_tag):
try:
tag = get_tag()
except Exception:
tag = None
if tag is not None:
try:
_apply_tag_info(tag, info_labels)
if cast:
_apply_tag_cast(tag, cast)
except Exception:
pass
else:
# Fallback für ältere Kodi-Versionen
setter = getattr(item, "setInfo", None)
if callable(setter):
try:
setter("video", info_labels)
except Exception:
pass
if cast:
setter = getattr(item, "setCast", None)
if callable(setter):
try:
setter(cast)
except Exception:
pass
def _apply_tag_info(tag, info: dict[str, Any]) -> None:
for key, method in [
("title", "setTitle"),
("plot", "setPlot"),
("mediatype", "setMediaType"),
("tvshowtitle", "setTvShowTitle"),
]:
val = info.get(key)
if val:
setter = getattr(tag, method, None)
if callable(setter): setter(str(val))
for key, method in [("season", "setSeason"), ("episode", "setEpisode")]:
val = info.get(key)
if val not in (None, "", 0, "0"):
setter = getattr(tag, method, None)
if callable(setter): setter(int(val))
rating = info.get("rating")
if rating not in (None, "", 0, "0"):
set_rating = getattr(tag, "setRating", None)
if callable(set_rating):
try: set_rating(float(rating))
except Exception: pass
def _apply_tag_cast(tag, cast) -> None:
setter = getattr(tag, "setCast", None)
if not callable(setter):
return
try:
formatted_cast = []
for c in cast:
# Erwarte TmdbCastMember oder ähnliches Objekt/Dict
name = getattr(c, "name", "") or c.get("name", "") if hasattr(c, "get") else ""
role = getattr(c, "role", "") or c.get("role", "") if hasattr(c, "get") else ""
thumb = getattr(c, "thumbnail", "") or c.get("thumbnail", "") if hasattr(c, "get") else ""
if name:
formatted_cast.append(xbmcgui.Actor(name=name, role=role, thumbnail=thumb))
if formatted_cast:
setter(formatted_cast)
except Exception:
pass
def label_with_duration(label: str, info_labels: dict[str, Any]) -> str:
duration = info_labels.get("duration")
if not duration:
return label
try:
minutes = int(duration) // 60
if minutes > 0:
return f"{label} ({minutes} Min.)"
except Exception:
pass
return label
def extract_first_int(value: str | int | None) -> Optional[int]:
if value is None:
return None
if isinstance(value, int):
return value
match = re.search(r"\d+", str(value))
return int(match.group()) if match else None
def looks_like_unresolved_hoster_link(url: str) -> bool:
url = (url or "").strip()
return any(p in url.casefold() for p in ["hoster", "link", "resolve"])
def is_resolveurl_missing_error(err: str | None) -> bool:
err = str(err or "").strip().lower()
return "resolveurl" in err and ("missing" in err or "not found" in err)
def is_cloudflare_challenge_error(err: str | None) -> bool:
err = str(err or "").strip().lower()
return "cloudflare" in err or "challenge" in err
def resolveurl_last_error() -> str:
try:
from resolveurl_backend import get_last_error # type: ignore
except Exception:
return ""
try:
return str(get_last_error() or "")
except Exception:
return ""

448
addon/core/metadata.py Normal file
View File

@@ -0,0 +1,448 @@
from __future__ import annotations
import asyncio
import os
import re
import threading
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple
from tmdb import (
TmdbCastMember,
fetch_tv_episode_credits,
lookup_movie,
lookup_tv_season,
lookup_tv_season_summary,
lookup_tv_show,
)
try:
import xbmc
import xbmcaddon
import xbmcvfs
except ImportError:
xbmc = None
xbmcaddon = None
xbmcvfs = None
# Caches
_TMDB_CACHE: dict[str, tuple[dict[str, str], dict[str, str]]] = {}
_TMDB_CAST_CACHE: dict[str, list[TmdbCastMember]] = {}
_TMDB_SEASON_CACHE: dict[tuple[int, int, str, str], dict[int, tuple[dict[str, str], dict[str, str]]]] = {}
_TMDB_SEASON_SUMMARY_CACHE: dict[tuple[int, int, str, str], tuple[dict[str, str], dict[str, str]]] = {}
_TMDB_EPISODE_CAST_CACHE: dict[tuple[int, int, int, str], list[TmdbCastMember]] = {}
_TMDB_ID_CACHE: dict[str, int] = {}
_TMDB_LOG_PATH: str | None = None
_TMDB_LOCK = threading.RLock()
# Dependency Injection variables
_initialized: bool = False
_get_setting_string: Callable[[str], str] = lambda k: ""
_get_setting_bool: Callable[[str, bool], bool] = lambda k, default=False: default
_get_setting_int: Callable[[str, int], int] = lambda k, default=0: default
_log: Callable[[str, int], None] = lambda msg, level=0: None
_run_async: Callable[[Any], Any] = lambda coro: None
_extract_first_int: Callable[[str], Optional[int]] = lambda val: None
def _require_init() -> None:
"""Gibt eine Warnung aus, wenn metadata.init() noch nicht aufgerufen wurde."""
if not _initialized:
import sys
print("[ViewIT/metadata] WARNUNG: metadata.init() wurde nicht aufgerufen Metadaten-Funktionen arbeiten mit Standardwerten!", file=sys.stderr)
def init(
*,
get_setting_string: Callable[[str], str],
get_setting_bool: Callable[..., bool],
get_setting_int: Callable[..., int],
log_fn: Callable[[str, int], None],
run_async_fn: Callable[[Any], Any],
extract_first_int_fn: Callable[[str], Optional[int]],
) -> None:
global _initialized, _get_setting_string, _get_setting_bool, _get_setting_int, _log, _run_async, _extract_first_int
_get_setting_string = get_setting_string
_get_setting_bool = get_setting_bool
_get_setting_int = get_setting_int
_log = log_fn
_run_async = run_async_fn
_extract_first_int = extract_first_int_fn
_initialized = True
def _get_log_path(filename: str) -> str:
if xbmcaddon and xbmcvfs:
addon = xbmcaddon.Addon()
profile = xbmcvfs.translatePath(addon.getAddonInfo("profile"))
log_dir = os.path.join(profile, "logs")
if not xbmcvfs.exists(log_dir):
xbmcvfs.mkdirs(log_dir)
return os.path.join(log_dir, filename)
return os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), filename)
def tmdb_file_log(message: str) -> None:
global _TMDB_LOG_PATH
if _TMDB_LOG_PATH is None:
_TMDB_LOG_PATH = _get_log_path("tmdb.log")
timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z"
line = f"{timestamp}\t{message}\n"
try:
with open(_TMDB_LOG_PATH, "a", encoding="utf-8") as handle:
handle.write(line)
except Exception:
if xbmcvfs is None:
return
try:
handle = xbmcvfs.File(_TMDB_LOG_PATH, "a") # type: ignore
handle.write(line) # type: ignore
handle.close() # type: ignore
except Exception:
return
def tmdb_cache_get(cache: dict, key, default=None):
with _TMDB_LOCK:
return cache.get(key, default)
def tmdb_cache_set(cache: dict, key, value) -> None:
with _TMDB_LOCK:
cache[key] = value
def tmdb_prefetch_concurrency() -> int:
try:
raw = _get_setting_string("tmdb_prefetch_concurrency").strip()
value = int(raw) if raw else 6
except Exception:
value = 6
return max(1, min(20, value))
def tmdb_enabled() -> bool:
_require_init()
return _get_setting_bool("tmdb_enabled", default=True)
def tmdb_list_enabled() -> bool:
return tmdb_enabled() and _get_setting_bool("tmdb_genre_metadata", default=False)
def tmdb_labels_and_art(title: str) -> tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]:
if not tmdb_enabled():
return {}, {}, []
title_key = (title or "").strip().casefold()
language = _get_setting_string("tmdb_language").strip() or "de-DE"
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
show_fanart = _get_setting_bool("tmdb_show_fanart", default=True)
show_rating = _get_setting_bool("tmdb_show_rating", default=True)
show_votes = _get_setting_bool("tmdb_show_votes", default=False)
show_cast = _get_setting_bool("tmdb_show_cast", default=False)
flags = f"p{int(show_plot)}a{int(show_art)}f{int(show_fanart)}r{int(show_rating)}v{int(show_votes)}c{int(show_cast)}"
cache_key = f"{language}|{flags}|{title_key}"
cached = tmdb_cache_get(_TMDB_CACHE, cache_key)
if cached is not None:
info, art = cached
cast_cached = tmdb_cache_get(_TMDB_CAST_CACHE, cache_key, [])
return info, art, list(cast_cached)
info_labels: dict[str, str] = {"title": title}
art: dict[str, str] = {}
cast: list[TmdbCastMember] = []
query = (title or "").strip()
api_key = _get_setting_string("tmdb_api_key").strip()
log_requests = _get_setting_bool("tmdb_log_requests", default=False)
log_responses = _get_setting_bool("tmdb_log_responses", default=False)
if api_key:
try:
log_fn = tmdb_file_log if (log_requests or log_responses) else None
candidates: list[str] = []
if query:
candidates.append(query)
simplified = re.sub(r"\s*[-]\s*der\s+film\s*$", "", query, flags=re.IGNORECASE).strip()
if simplified and simplified not in candidates:
candidates.append(simplified)
meta = None
is_tv = False
for candidate in candidates:
meta = lookup_tv_show(
title=candidate,
api_key=api_key,
language=language,
log=log_fn,
log_responses=log_responses,
include_cast=show_cast,
)
if meta:
is_tv = True
break
if not meta:
for candidate in candidates:
movie = lookup_movie(
title=candidate,
api_key=api_key,
language=language,
log=log_fn,
log_responses=log_responses,
include_cast=show_cast,
)
if movie:
meta = movie
break
except Exception as exc:
try:
tmdb_file_log(f"TMDB ERROR lookup_failed title={title!r} error={exc!r}")
except Exception:
pass
_log(f"TMDB Meta fehlgeschlagen: {exc}", 1) # LOGWARNING/LOGDEBUG fallback
meta = None
if meta:
if is_tv:
tmdb_cache_set(_TMDB_ID_CACHE, title_key, int(getattr(meta, "tmdb_id", 0) or 0))
info_labels.setdefault("mediatype", "tvshow")
else:
info_labels.setdefault("mediatype", "movie")
if show_plot and getattr(meta, "plot", ""):
info_labels["plot"] = getattr(meta, "plot", "")
runtime_minutes = int(getattr(meta, "runtime_minutes", 0) or 0)
if runtime_minutes > 0 and not is_tv:
info_labels["duration"] = str(runtime_minutes * 60)
rating = getattr(meta, "rating", 0.0) or 0.0
votes = getattr(meta, "votes", 0) or 0
if show_rating and rating:
info_labels["rating"] = str(rating)
if show_votes and votes:
info_labels["votes"] = str(votes)
if show_art and getattr(meta, "poster", ""):
poster = getattr(meta, "poster", "")
art.update({"thumb": poster, "poster": poster, "icon": poster})
if show_fanart and getattr(meta, "fanart", ""):
fanart = getattr(meta, "fanart", "")
if fanart:
art.update({"fanart": fanart, "landscape": fanart})
if show_cast:
cast = list(getattr(meta, "cast", []) or [])
elif log_requests or log_responses:
tmdb_file_log(f"TMDB MISS title={title!r}")
tmdb_cache_set(_TMDB_CACHE, cache_key, (info_labels, art))
tmdb_cache_set(_TMDB_CAST_CACHE, cache_key, list(cast))
return info_labels, art, list(cast)
async def _tmdb_labels_and_art_bulk_async(
titles: list[str],
) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]]:
titles = [str(t).strip() for t in (titles or []) if t and str(t).strip()]
if not titles:
return {}
unique_titles: list[str] = list(dict.fromkeys(titles))
limit = tmdb_prefetch_concurrency()
semaphore = asyncio.Semaphore(limit)
async def fetch_one(title: str):
async with semaphore:
return title, await asyncio.to_thread(tmdb_labels_and_art, title)
tasks = [fetch_one(title) for title in unique_titles]
results = await asyncio.gather(*tasks, return_exceptions=True)
mapped: dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]] = {}
for entry in results:
if isinstance(entry, Exception):
continue
try:
title, payload = entry
except Exception:
continue
if isinstance(title, str) and isinstance(payload, tuple) and len(payload) == 3:
mapped[title] = payload # type: ignore[assignment]
return mapped
def tmdb_labels_and_art_bulk(
titles: list[str],
) -> dict[str, tuple[dict[str, str], dict[str, str], list[TmdbCastMember]]]:
if not tmdb_enabled():
return {}
return _run_async(_tmdb_labels_and_art_bulk_async(titles))
def tmdb_episode_labels_and_art(*, title: str, season_label: str, episode_label: str) -> tuple[dict[str, str], dict[str, str]]:
if not tmdb_enabled():
return {"title": episode_label}, {}
title_key = (title or "").strip().casefold()
tmdb_id = tmdb_cache_get(_TMDB_ID_CACHE, title_key)
if not tmdb_id:
tmdb_labels_and_art(title)
tmdb_id = tmdb_cache_get(_TMDB_ID_CACHE, title_key)
if not tmdb_id:
return {"title": episode_label}, {}
season_number = _extract_first_int(season_label)
episode_number = _extract_first_int(episode_label)
if season_number is None or episode_number is None:
return {"title": episode_label}, {}
language = _get_setting_string("tmdb_language").strip() or "de-DE"
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
flags = f"p{int(show_plot)}a{int(show_art)}"
season_key = (tmdb_id, season_number, language, flags)
cached_season = tmdb_cache_get(_TMDB_SEASON_CACHE, season_key)
if cached_season is None:
api_key = _get_setting_string("tmdb_api_key").strip()
if not api_key:
return {"title": episode_label}, {}
log_requests = _get_setting_bool("tmdb_log_requests", default=False)
log_responses = _get_setting_bool("tmdb_log_responses", default=False)
log_fn = tmdb_file_log if (log_requests or log_responses) else None
try:
season_meta = lookup_tv_season(
tmdb_id=tmdb_id,
season_number=season_number,
api_key=api_key,
language=language,
log=log_fn,
log_responses=log_responses,
)
except Exception as exc:
if log_fn:
log_fn(f"TMDB ERROR season_lookup_failed tmdb_id={tmdb_id} season={season_number} error={exc!r}")
season_meta = None
mapped: dict[int, tuple[dict[str, str], dict[str, str]]] = {}
if season_meta:
for ep_no, ep in season_meta.items():
info: dict[str, str] = {"title": f"Episode {ep_no}"}
if show_plot and ep.plot:
info["plot"] = ep.plot
if getattr(ep, "runtime_minutes", 0):
info["duration"] = str(int(getattr(ep, "runtime_minutes", 0)) * 60)
art: dict[str, str] = {}
if show_art and ep.thumb:
art = {"thumb": ep.thumb}
mapped[ep_no] = (info, art)
tmdb_cache_set(_TMDB_SEASON_CACHE, season_key, mapped)
cached_season = mapped
return cached_season.get(episode_number, ({"title": episode_label}, {}))
def tmdb_episode_cast(*, title: str, season_label: str, episode_label: str) -> list[TmdbCastMember]:
if not tmdb_enabled():
return []
show_episode_cast = _get_setting_bool("tmdb_show_episode_cast", default=False)
if not show_episode_cast:
return []
title_key = (title or "").strip().casefold()
tmdb_id = tmdb_cache_get(_TMDB_ID_CACHE, title_key)
if not tmdb_id:
tmdb_labels_and_art(title)
tmdb_id = tmdb_cache_get(_TMDB_ID_CACHE, title_key)
if not tmdb_id:
return []
season_number = _extract_first_int(season_label)
episode_number = _extract_first_int(episode_label)
if season_number is None or episode_number is None:
return []
language = _get_setting_string("tmdb_language").strip() or "de-DE"
cache_key = (tmdb_id, season_number, episode_number, language)
cached = tmdb_cache_get(_TMDB_EPISODE_CAST_CACHE, cache_key)
if cached is not None:
return list(cached)
api_key = _get_setting_string("tmdb_api_key").strip()
if not api_key:
tmdb_cache_set(_TMDB_EPISODE_CAST_CACHE, cache_key, [])
return []
log_requests = _get_setting_bool("tmdb_log_requests", default=False)
log_responses = _get_setting_bool("tmdb_log_responses", default=False)
log_fn = tmdb_file_log if (log_requests or log_responses) else None
try:
cast = fetch_tv_episode_credits(
tmdb_id=tmdb_id,
season_number=season_number,
episode_number=episode_number,
api_key=api_key,
language=language,
log=log_fn,
log_responses=log_responses,
)
except Exception as exc:
if log_fn:
log_fn(
f"TMDB ERROR episode_credits_failed tmdb_id={tmdb_id} season={season_number} episode={episode_number} error={exc!r}"
)
cast = []
tmdb_cache_set(_TMDB_EPISODE_CAST_CACHE, cache_key, list(cast))
return list(cast)
def tmdb_season_labels_and_art(
*,
title: str,
season: str,
title_info_labels: dict[str, str] | None = None,
) -> tuple[dict[str, str], dict[str, str]]:
if not tmdb_enabled():
return {"title": season}, {}
language = _get_setting_string("tmdb_language").strip() or "de-DE"
show_plot = _get_setting_bool("tmdb_show_plot", default=True)
show_art = _get_setting_bool("tmdb_show_art", default=True)
flags = f"p{int(show_plot)}a{int(show_art)}"
api_key = _get_setting_string("tmdb_api_key").strip()
log_requests = _get_setting_bool("tmdb_log_requests", default=False)
log_responses = _get_setting_bool("tmdb_log_responses", default=False)
log_fn = tmdb_file_log if (log_requests or log_responses) else None
info_labels: dict[str, str] | None = None
art: dict[str, str] | None = None
season_number = _extract_first_int(season)
if api_key and season_number is not None:
title_key = (title or "").strip().casefold()
tmdb_id = tmdb_cache_get(_TMDB_ID_CACHE, title_key) or 0
cache_key = (tmdb_id, season_number, language, flags)
cached = tmdb_cache_get(_TMDB_SEASON_SUMMARY_CACHE, cache_key)
if cached is None and tmdb_id:
try:
meta = lookup_tv_season_summary(
tmdb_id=tmdb_id,
season_number=season_number,
api_key=api_key,
language=language,
log=log_fn,
log_responses=log_responses,
)
except Exception as exc:
if log_fn:
log_fn(f"TMDB ERROR season_summary_failed tmdb_id={tmdb_id} season={season_number} error={exc!r}")
meta = None
labels = {"title": season}
art_map: dict[str, str] = {}
if meta:
if show_plot and meta.plot:
labels["plot"] = meta.plot
if show_art and meta.poster:
art_map = {"thumb": meta.poster, "poster": meta.poster}
cached = (labels, art_map)
tmdb_cache_set(_TMDB_SEASON_SUMMARY_CACHE, cache_key, cached)
if cached is not None:
info_labels, art = cached
merged_labels = dict(info_labels or {})
if title_info_labels:
merged_labels = dict(title_info_labels)
merged_labels.update(dict(info_labels or {}))
return merged_labels, art or {}

54
addon/core/playstate.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import threading
from typing import Any
# Playstate-Verwaltung für den ViewIT Kodi Addon.
# Aktuell sind die meisten Funktionen Stubs, da Kodi die Wiedergabe-Stände selbst verwaltet.
_PLAYSTATE_CACHE: dict[str, dict[str, Any]] | None = None
_PLAYSTATE_LOCK = threading.RLock()
def playstate_key(*, plugin_name: str, title: str, season: str, episode: str) -> str:
plugin_name = (plugin_name or "").strip()
title = (title or "").strip()
season = (season or "").strip()
episode = (episode or "").strip()
return f"{plugin_name}\t{title}\t{season}\t{episode}"
def load_playstate() -> dict[str, dict[str, Any]]:
return {}
def save_playstate(state: dict[str, dict[str, Any]]) -> None:
return
def get_playstate(key: str) -> dict[str, Any]:
return {}
def set_playstate(key: str, value: dict[str, Any]) -> None:
return
def apply_playstate_to_info(info_labels: dict[str, Any], playstate: dict[str, Any]) -> dict[str, Any]:
return dict(info_labels or {})
def label_with_playstate(label: str, playstate: dict[str, Any]) -> str:
return label
def title_playstate(plugin_name: str, title: str) -> dict[str, Any]:
return get_playstate(playstate_key(plugin_name=plugin_name, title=title, season="", episode=""))
def season_playstate(plugin_name: str, title: str, season: str) -> dict[str, Any]:
return get_playstate(playstate_key(plugin_name=plugin_name, title=title, season=season, episode=""))
def track_playback_and_update_state_async(key: str) -> None:
# Eigenes Resume/Watched ist deaktiviert; Kodi verwaltet das selbst.
return

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3
"""Plugin-Erkennung und -Verwaltung fuer ViewIT.
Dieses Modul laedt dynamisch alle Plugins aus dem `plugins/` Verzeichnis,
instanziiert sie und cached die Instanzen im RAM.
"""
from __future__ import annotations
import importlib.util
import inspect
import sys
from pathlib import Path
from types import ModuleType
try: # pragma: no cover - Kodi runtime
import xbmc # type: ignore[import-not-found]
except ImportError: # pragma: no cover
xbmc = None
from plugin_interface import BasisPlugin
PLUGIN_DIR = Path(__file__).resolve().parent.parent / "plugins"
_PLUGIN_CACHE: dict[str, BasisPlugin] | None = None
def _log(message: str, level: int = 1) -> None:
if xbmc is not None:
xbmc.log(f"[ViewIt] {message}", level)
def import_plugin_module(path: Path) -> ModuleType:
"""Importiert eine einzelne Plugin-Datei als Python-Modul."""
spec = importlib.util.spec_from_file_location(path.stem, path)
if spec is None or spec.loader is None:
raise ImportError(f"Modul-Spezifikation fuer {path.name} fehlt.")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
try:
spec.loader.exec_module(module)
except Exception:
sys.modules.pop(spec.name, None)
raise
return module
def discover_plugins() -> dict[str, BasisPlugin]:
"""Laedt alle Plugins aus `plugins/*.py` und cached Instanzen im RAM."""
global _PLUGIN_CACHE
if _PLUGIN_CACHE is not None:
return _PLUGIN_CACHE
plugins: dict[str, BasisPlugin] = {}
if not PLUGIN_DIR.exists():
_PLUGIN_CACHE = plugins
return plugins
for file_path in sorted(PLUGIN_DIR.glob("*.py")):
if file_path.name.startswith("_"):
continue
try:
module = import_plugin_module(file_path)
except Exception as exc:
_log(f"Plugin-Datei {file_path.name} konnte nicht geladen werden: {exc}", 2)
continue
preferred = getattr(module, "Plugin", None)
if inspect.isclass(preferred) and issubclass(preferred, BasisPlugin) and preferred is not BasisPlugin:
plugin_classes = [preferred]
else:
plugin_classes = [
obj
for obj in module.__dict__.values()
if inspect.isclass(obj) and issubclass(obj, BasisPlugin) and obj is not BasisPlugin
]
plugin_classes.sort(key=lambda cls: cls.__name__.casefold())
for cls in plugin_classes:
try:
instance = cls()
except Exception as exc:
_log(f"Plugin {cls.__name__} konnte nicht geladen werden: {exc}", 2)
continue
if getattr(instance, "is_available", True) is False:
reason = getattr(instance, "unavailable_reason", "Nicht verfuegbar.")
_log(f"Plugin {cls.__name__} deaktiviert: {reason}", 2)
continue
plugin_name = str(getattr(instance, "name", "") or "").strip()
if not plugin_name:
_log(
f"Plugin {cls.__name__} wurde ohne Name registriert und wird uebersprungen.",
2,
)
continue
if plugin_name in plugins:
_log(
f"Plugin-Name doppelt ({plugin_name}), {cls.__name__} wird uebersprungen.",
2,
)
continue
plugins[plugin_name] = instance
plugins = dict(sorted(plugins.items(), key=lambda item: item[0].casefold()))
_PLUGIN_CACHE = plugins
return plugins
def plugin_has_capability(plugin: BasisPlugin, capability: str) -> bool:
"""Prueft ob ein Plugin eine bestimmte Faehigkeit hat."""
getter = getattr(plugin, "capabilities", None)
if callable(getter):
try:
capabilities = getter()
except Exception:
capabilities = set()
try:
return capability in set(capabilities or [])
except Exception:
return False
# Backwards compatibility: Popular via POPULAR_GENRE_LABEL constant.
if capability == "popular_series":
return _popular_genre_label(plugin) is not None
return False
def _popular_genre_label(plugin: BasisPlugin) -> str | None:
label = getattr(plugin, "POPULAR_GENRE_LABEL", None)
if isinstance(label, str) and label.strip():
return label.strip()
return None
def popular_genre_label(plugin: BasisPlugin) -> str | None:
"""Gibt das POPULAR_GENRE_LABEL des Plugins zurueck, falls vorhanden."""
return _popular_genre_label(plugin)
def plugins_with_popular() -> list[tuple[str, BasisPlugin, str]]:
"""Liefert alle Plugins die 'popular_series' unterstuetzen."""
results: list[tuple[str, BasisPlugin, str]] = []
for plugin_name, plugin in discover_plugins().items():
if not plugin_has_capability(plugin, "popular_series"):
continue
label = _popular_genre_label(plugin) or ""
results.append((plugin_name, plugin, label))
return results
def series_url_params(plugin: BasisPlugin, title: str) -> dict[str, str]:
"""Liefert series_url Parameter fuer Kodi-Navigation, falls vom Plugin bereitgestellt."""
getter = getattr(plugin, "series_url_for_title", None)
if not callable(getter):
return {}
try:
series_url = str(getter(title) or "").strip()
except Exception:
return {}
return {"series_url": series_url} if series_url else {}

58
addon/core/router.py Normal file
View File

@@ -0,0 +1,58 @@
from __future__ import annotations
import sys
from typing import Any, Callable, Dict, Optional
from urllib.parse import parse_qs
class Router:
"""A simple router for Kodi add-ons."""
def __init__(self) -> None:
self._routes: Dict[str, Callable[[Dict[str, str]], Any]] = {}
self._fallback: Optional[Callable[[Dict[str, str]], Any]] = None
def route(self, action: str) -> Callable[[Callable[[Dict[str, str]], Any]], Callable[[Dict[str, str]], Any]]:
"""Decorator to register a function for a specific action."""
def decorator(handler: Callable[[Dict[str, str]], Any]) -> Callable[[Dict[str, str]], Any]:
self._routes[action] = handler
return handler
return decorator
def fallback(self) -> Callable[[Callable[[Dict[str, str]], Any]], Callable[[Dict[str, str]], Any]]:
"""Decorator to register the fallback (default) handler."""
def decorator(handler: Callable[[Dict[str, str]], Any]) -> Callable[[Dict[str, str]], Any]:
self._fallback = handler
return handler
return decorator
def dispatch(self, action: Optional[str] = None, params: Optional[Dict[str, str]] = None) -> Any:
"""Dispatch the request to the registered handler."""
if params is None:
params = {}
handler = self._routes.get(action) if action else self._fallback
if not handler:
handler = self._fallback
if handler:
return handler(params)
raise KeyError(f"No route or fallback defined for action: {action}")
def parse_params(argv: Optional[list[str]] = None) -> dict[str, str]:
"""Parst Kodi-Plugin-Parameter aus `sys.argv[2]` oder der übergebenen Liste."""
if argv is None:
argv = sys.argv
if len(argv) <= 2 or not argv[2]:
return {}
raw_params = parse_qs(argv[2].lstrip("?"), keep_blank_values=True)
return {key: values[0] for key, values in raw_params.items()}
def parse_positive_int(value: str, *, default: int = 1) -> int:
try:
parsed = int(value)
return parsed if parsed > 0 else default
except (ValueError, TypeError):
return default

555
addon/core/trakt.py Normal file
View File

@@ -0,0 +1,555 @@
"""Trakt.tv API-Integration fuer ViewIT.
Bietet OAuth-Device-Auth, Scrobbling, Watchlist, History und Calendar.
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import urlencode
try:
import requests
except ImportError:
requests = None
TRAKT_API_BASE = "https://api.trakt.tv"
TRAKT_API_VERSION = "2"
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class TraktToken:
access_token: str
refresh_token: str
expires_at: int # Unix-Timestamp
created_at: int
@dataclass(frozen=True)
class TraktDeviceCode:
device_code: str
user_code: str
verification_url: str
expires_in: int
interval: int
@dataclass(frozen=True)
class TraktMediaIds:
trakt: int = 0
tmdb: int = 0
imdb: str = ""
slug: str = ""
tvdb: int = 0
@dataclass(frozen=True)
class TraktItem:
title: str
year: int
media_type: str # "movie", "show" oder "episode"
ids: TraktMediaIds = field(default_factory=TraktMediaIds)
season: int = 0
episode: int = 0
watched_at: str = ""
poster: str = ""
episode_title: str = "" # Episodentitel (extended=full)
episode_overview: str = "" # Episoden-Inhaltsangabe (extended=full)
episode_thumb: str = "" # Screenshot-URL (extended=images)
show_poster: str = "" # Serien-Poster-URL (extended=images)
show_fanart: str = "" # Serien-Fanart-URL (extended=images)
@dataclass(frozen=True)
class TraktEpisodeMeta:
"""Metadaten einer einzelnen Episode (aus extended=full,images)."""
title: str
overview: str
runtime_minutes: int
thumb: str # Screenshot-URL (https://)
@dataclass(frozen=True)
class TraktCalendarItem:
"""Ein Eintrag aus dem Trakt-Kalender (anstehende Episode)."""
show_title: str
show_year: int
show_ids: TraktMediaIds
season: int
episode: int
episode_title: str
episode_overview: str # Episoden-Inhaltsangabe (extended=full)
episode_thumb: str # Screenshot-URL (https://)
show_poster: str # Poster-URL (https://)
show_fanart: str # Fanart-URL (https://)
first_aired: str # ISO-8601, z.B. "2026-03-02T02:00:00.000Z"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _trakt_image_url(raw: str) -> str:
"""Stellt https:// vor relative Trakt-Bild-URLs."""
if not raw:
return ""
raw = raw.strip()
if raw.startswith("http"):
return raw
return f"https://{raw}"
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class TraktClient:
"""Trakt API Client."""
def __init__(
self,
client_id: str,
client_secret: str,
*,
log: Callable[[str], None] | None = None,
) -> None:
self._client_id = client_id
self._client_secret = client_secret
self._log = log
def _headers(self, token: str = "") -> dict[str, str]:
h = {
"Content-Type": "application/json",
"trakt-api-version": TRAKT_API_VERSION,
"trakt-api-key": self._client_id,
}
if token:
h["Authorization"] = f"Bearer {token}"
return h
def _do_log(self, msg: str) -> None:
if callable(self._log):
self._log(f"[Trakt] {msg}")
def _post(self, path: str, body: dict, *, token: str = "", timeout: int = 15) -> tuple[int, dict | None]:
if requests is None:
return 0, None
url = f"{TRAKT_API_BASE}{path}"
self._do_log(f"POST {path}")
try:
resp = requests.post(url, json=body, headers=self._headers(token), timeout=timeout)
status = resp.status_code
try:
payload = resp.json()
except Exception:
payload = None
self._do_log(f"POST {path} -> {status}")
return status, payload
except Exception as exc:
self._do_log(f"POST {path} FEHLER: {exc}")
return 0, None
def _get(self, path: str, *, token: str = "", timeout: int = 15) -> tuple[int, Any]:
if requests is None:
return 0, None
url = f"{TRAKT_API_BASE}{path}"
self._do_log(f"GET {path}")
try:
resp = requests.get(url, headers=self._headers(token), timeout=timeout)
status = resp.status_code
try:
payload = resp.json()
except Exception:
payload = None
self._do_log(f"GET {path} -> {status}")
return status, payload
except Exception as exc:
self._do_log(f"GET {path} FEHLER: {exc}")
return 0, None
# -------------------------------------------------------------------
# OAuth Device Flow
# -------------------------------------------------------------------
def device_code_request(self) -> TraktDeviceCode | None:
"""POST /oauth/device/code generiert User-Code + Verification-URL."""
status, payload = self._post("/oauth/device/code", {"client_id": self._client_id})
if status != 200 or not isinstance(payload, dict):
return None
return TraktDeviceCode(
device_code=payload.get("device_code", ""),
user_code=payload.get("user_code", ""),
verification_url=payload.get("verification_url", "https://trakt.tv/activate"),
expires_in=int(payload.get("expires_in", 600)),
interval=int(payload.get("interval", 5)),
)
def poll_device_token(self, device_code: str, *, interval: int = 5, expires_in: int = 600) -> TraktToken | None:
"""Pollt POST /oauth/device/token bis autorisiert oder Timeout."""
body = {
"code": device_code,
"client_id": self._client_id,
"client_secret": self._client_secret,
}
start = time.time()
while time.time() - start < expires_in:
status, payload = self._post("/oauth/device/token", body)
if status == 200 and isinstance(payload, dict):
return TraktToken(
access_token=payload.get("access_token", ""),
refresh_token=payload.get("refresh_token", ""),
expires_at=int(payload.get("created_at", 0)) + int(payload.get("expires_in", 0)),
created_at=int(payload.get("created_at", 0)),
)
if status == 400:
# Pending weiter warten
time.sleep(interval)
continue
if status in (404, 410, 418):
# Ungueltig, abgelaufen oder abgelehnt
self._do_log(f"Device-Auth abgebrochen: status={status}")
return None
if status == 429:
time.sleep(interval + 1)
continue
time.sleep(interval)
return None
def refresh_token(self, refresh_tok: str) -> TraktToken | None:
"""POST /oauth/token Token erneuern."""
body = {
"refresh_token": refresh_tok,
"client_id": self._client_id,
"client_secret": self._client_secret,
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
"grant_type": "refresh_token",
}
status, payload = self._post("/oauth/token", body)
if status != 200 or not isinstance(payload, dict):
return None
return TraktToken(
access_token=payload.get("access_token", ""),
refresh_token=payload.get("refresh_token", ""),
expires_at=int(payload.get("created_at", 0)) + int(payload.get("expires_in", 0)),
created_at=int(payload.get("created_at", 0)),
)
# -------------------------------------------------------------------
# Scrobble
# -------------------------------------------------------------------
def _build_scrobble_body(
self,
*,
media_type: str,
title: str,
tmdb_id: int,
imdb_id: str = "",
season: int = 0,
episode: int = 0,
progress: float = 0.0,
) -> dict:
ids: dict[str, object] = {}
if tmdb_id:
ids["tmdb"] = tmdb_id
if imdb_id:
ids["imdb"] = imdb_id
body: dict[str, object] = {"progress": round(progress, 1)}
if media_type == "tv" and season > 0 and episode > 0:
body["show"] = {"title": title, "ids": ids}
body["episode"] = {"season": season, "number": episode}
else:
body["movie"] = {"title": title, "ids": ids}
return body
def scrobble_start(
self, token: str, *, media_type: str, title: str,
tmdb_id: int, imdb_id: str = "",
season: int = 0, episode: int = 0, progress: float = 0.0,
) -> bool:
"""POST /scrobble/start"""
body = self._build_scrobble_body(
media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id,
season=season, episode=episode, progress=progress,
)
status, _ = self._post("/scrobble/start", body, token=token)
return status in (200, 201)
def scrobble_pause(
self, token: str, *, media_type: str, title: str,
tmdb_id: int, imdb_id: str = "",
season: int = 0, episode: int = 0, progress: float = 50.0,
) -> bool:
"""POST /scrobble/pause"""
body = self._build_scrobble_body(
media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id,
season=season, episode=episode, progress=progress,
)
status, _ = self._post("/scrobble/pause", body, token=token)
return status in (200, 201)
def scrobble_stop(
self, token: str, *, media_type: str, title: str,
tmdb_id: int, imdb_id: str = "",
season: int = 0, episode: int = 0, progress: float = 100.0,
) -> bool:
"""POST /scrobble/stop"""
body = self._build_scrobble_body(
media_type=media_type, title=title, tmdb_id=tmdb_id, imdb_id=imdb_id,
season=season, episode=episode, progress=progress,
)
status, _ = self._post("/scrobble/stop", body, token=token)
return status in (200, 201)
# -------------------------------------------------------------------
# Watchlist
# -------------------------------------------------------------------
def get_watchlist(self, token: str, *, media_type: str = "") -> list[TraktItem]:
"""GET /users/me/watchlist[/movies|/shows]"""
path = "/users/me/watchlist"
if media_type in ("movies", "shows"):
path = f"{path}/{media_type}"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return []
return self._parse_list_items(payload)
def add_to_watchlist(
self, token: str, *, media_type: str, tmdb_id: int, imdb_id: str = "",
) -> bool:
"""POST /sync/watchlist"""
ids: dict[str, object] = {}
if tmdb_id:
ids["tmdb"] = tmdb_id
if imdb_id:
ids["imdb"] = imdb_id
key = "movies" if media_type == "movie" else "shows"
body = {key: [{"ids": ids}]}
status, _ = self._post("/sync/watchlist", body, token=token)
return status in (200, 201)
def remove_from_watchlist(
self, token: str, *, media_type: str, tmdb_id: int, imdb_id: str = "",
) -> bool:
"""POST /sync/watchlist/remove"""
ids: dict[str, object] = {}
if tmdb_id:
ids["tmdb"] = tmdb_id
if imdb_id:
ids["imdb"] = imdb_id
key = "movies" if media_type == "movie" else "shows"
body = {key: [{"ids": ids}]}
status, _ = self._post("/sync/watchlist/remove", body, token=token)
return status == 200
# -------------------------------------------------------------------
# History
# -------------------------------------------------------------------
def get_history(
self, token: str, *, media_type: str = "", page: int = 1, limit: int = 20,
) -> list[TraktItem]:
"""GET /users/me/history[/movies|/shows|/episodes]"""
path = "/users/me/history"
if media_type in ("movies", "shows", "episodes"):
path = f"{path}/{media_type}"
path = f"{path}?page={page}&limit={limit}&extended=full,images"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return []
return self._parse_history_items(payload)
# -------------------------------------------------------------------
# Calendar
# -------------------------------------------------------------------
def get_calendar(self, token: str, start_date: str = "", days: int = 7) -> list[TraktCalendarItem]:
"""GET /calendars/my/shows/{start_date}/{days}
start_date: YYYY-MM-DD (leer = heute).
Liefert anstehende Episoden der eigenen Watchlist-Serien.
"""
if not start_date:
from datetime import date
start_date = date.today().strftime("%Y-%m-%d")
path = f"/calendars/my/shows/{start_date}/{days}?extended=full,images"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return []
items: list[TraktCalendarItem] = []
for entry in payload:
if not isinstance(entry, dict):
continue
show = entry.get("show") or {}
ep = entry.get("episode") or {}
show_ids = self._parse_ids(show.get("ids") or {})
ep_images = ep.get("images") or {}
show_images = show.get("images") or {}
def _first(img_dict: dict, key: str) -> str:
imgs = img_dict.get(key) or []
return _trakt_image_url(imgs[0]) if imgs else ""
items.append(TraktCalendarItem(
show_title=str(show.get("title", "") or ""),
show_year=int(show.get("year", 0) or 0),
show_ids=show_ids,
season=int(ep.get("season", 0) or 0),
episode=int(ep.get("number", 0) or 0),
episode_title=str(ep.get("title", "") or ""),
episode_overview=str(ep.get("overview", "") or ""),
episode_thumb=_first(ep_images, "screenshot"),
show_poster=_first(show_images, "poster"),
show_fanart=_first(show_images, "fanart"),
first_aired=str(entry.get("first_aired", "") or ""),
))
return items
def search_show(self, query: str) -> str:
"""GET /search/show?query=... gibt slug des ersten Treffers zurück, sonst ''."""
from urllib.parse import urlencode
path = f"/search/show?{urlencode({'query': query, 'limit': 1})}"
status, payload = self._get(path)
if status != 200 or not isinstance(payload, list) or not payload:
return ""
show = (payload[0] or {}).get("show") or {}
ids = show.get("ids") or {}
return str(ids.get("slug") or ids.get("trakt") or "")
def lookup_tv_season(
self,
show_id_or_slug: "str | int",
season_number: int,
*,
token: str = "",
) -> "dict[int, TraktEpisodeMeta] | None":
"""GET /shows/{id}/seasons/{n}/episodes?extended=full,images
Gibt episode_number -> TraktEpisodeMeta zurück, oder None bei Fehler.
"""
path = f"/shows/{show_id_or_slug}/seasons/{season_number}/episodes?extended=full,images"
status, payload = self._get(path, token=token)
if status != 200 or not isinstance(payload, list):
return None
result: "dict[int, TraktEpisodeMeta]" = {}
for entry in payload:
try:
ep_no = int(entry.get("number") or 0)
except Exception:
continue
if not ep_no:
continue
images = entry.get("images") or {}
screenshots = images.get("screenshot") or []
thumb = _trakt_image_url(screenshots[0]) if screenshots else ""
result[ep_no] = TraktEpisodeMeta(
title=str(entry.get("title") or "").strip(),
overview=str(entry.get("overview") or "").strip(),
runtime_minutes=int(entry.get("runtime") or 0),
thumb=thumb,
)
return result or None
def get_episode_translation(
self,
show_id_or_slug: "str | int",
season: int,
episode: int,
language: str = "de",
) -> "tuple[str, str]":
"""GET /shows/{id}/seasons/{s}/episodes/{e}/translations/{lang}
Gibt (title, overview) in der Zielsprache zurück, oder ('', '') bei Fehler.
"""
path = f"/shows/{show_id_or_slug}/seasons/{season}/episodes/{episode}/translations/{language}"
status, payload = self._get(path)
if status != 200 or not isinstance(payload, list) or not payload:
return "", ""
first = payload[0] if payload else {}
return str(first.get("title") or ""), str(first.get("overview") or "")
# -------------------------------------------------------------------
# Parser
# -------------------------------------------------------------------
@staticmethod
def _parse_ids(ids_dict: dict) -> TraktMediaIds:
return TraktMediaIds(
trakt=int(ids_dict.get("trakt", 0) or 0),
tmdb=int(ids_dict.get("tmdb", 0) or 0),
imdb=str(ids_dict.get("imdb", "") or ""),
slug=str(ids_dict.get("slug", "") or ""),
tvdb=int(ids_dict.get("tvdb", 0) or 0),
)
def _parse_list_items(self, items: list) -> list[TraktItem]:
result: list[TraktItem] = []
for entry in items:
if not isinstance(entry, dict):
continue
item_type = entry.get("type", "")
media = entry.get(item_type) or entry.get("movie") or entry.get("show") or {}
if not isinstance(media, dict):
continue
ids = self._parse_ids(media.get("ids") or {})
result.append(TraktItem(
title=str(media.get("title", "") or ""),
year=int(media.get("year", 0) or 0),
media_type=item_type,
ids=ids,
))
return result
def _parse_history_items(self, items: list) -> list[TraktItem]:
result: list[TraktItem] = []
for entry in items:
if not isinstance(entry, dict):
continue
item_type = entry.get("type", "")
watched_at = str(entry.get("watched_at", "") or "")
if item_type == "episode":
show = entry.get("show") or {}
ep = entry.get("episode") or {}
ids = self._parse_ids((show.get("ids") or {}))
ep_images = ep.get("images") or {}
show_images = show.get("images") or {}
def _first_img(img_dict: dict, key: str) -> str:
imgs = img_dict.get(key) or []
return _trakt_image_url(imgs[0]) if imgs else ""
result.append(TraktItem(
title=str(show.get("title", "") or ""),
year=int(show.get("year", 0) or 0),
media_type="episode",
ids=ids,
season=int(ep.get("season", 0) or 0),
episode=int(ep.get("number", 0) or 0),
watched_at=watched_at,
episode_title=str(ep.get("title", "") or ""),
episode_overview=str(ep.get("overview", "") or ""),
episode_thumb=_first_img(ep_images, "screenshot"),
show_poster=_first_img(show_images, "poster"),
show_fanart=_first_img(show_images, "fanart"),
))
else:
media = entry.get("movie") or entry.get("show") or {}
ids = self._parse_ids(media.get("ids") or {})
result.append(TraktItem(
title=str(media.get("title", "") or ""),
year=int(media.get("year", 0) or 0),
media_type=item_type,
ids=ids,
watched_at=watched_at,
))
return result

731
addon/core/updater.py Normal file
View File

@@ -0,0 +1,731 @@
#!/usr/bin/env python3
"""Update- und Versionsverwaltung fuer ViewIT.
Dieses Modul kuemmert sich um:
- Update-Kanaele (Main, Nightly, Dev, Custom)
- Versions-Abfrage und -Installation aus Repositories
- Changelog-Abruf
- Repository-Quellen-Verwaltung
- ResolveURL Auto-Installation
"""
from __future__ import annotations
import io
import json
import os
import re
import time
import xml.etree.ElementTree as ET
import zipfile
from urllib.error import URLError
from urllib.request import Request, urlopen
try: # pragma: no cover - Kodi runtime
import xbmc # type: ignore[import-not-found]
import xbmcaddon # type: ignore[import-not-found]
import xbmcgui # type: ignore[import-not-found]
import xbmcvfs # type: ignore[import-not-found]
except ImportError: # pragma: no cover - allow importing outside Kodi
xbmc = None
xbmcaddon = None
xbmcgui = None
xbmcvfs = None
from plugin_helpers import show_error, show_notification
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
UPDATE_CHANNEL_MAIN = 0
UPDATE_CHANNEL_NIGHTLY = 1
UPDATE_CHANNEL_CUSTOM = 2
UPDATE_CHANNEL_DEV = 3
AUTO_UPDATE_INTERVAL_SEC = 6 * 60 * 60
UPDATE_HTTP_TIMEOUT_SEC = 8
UPDATE_ADDON_ID = "plugin.video.viewit"
RESOLVEURL_ADDON_ID = "script.module.resolveurl"
RESOLVEURL_AUTO_INSTALL_INTERVAL_SEC = 6 * 60 * 60
# ---------------------------------------------------------------------------
# Hilfsfunktionen (Settings-Zugriff)
# ---------------------------------------------------------------------------
# Diese Callbacks werden von default.py einmal gesetzt, damit updater.py
# keine zirkulaeren Abhaengigkeiten hat.
_get_setting_string = None
_get_setting_bool = None
_get_setting_int = None
_set_setting_string = None
_get_addon = None
_log_fn = None
def init(
*,
get_setting_string,
get_setting_bool,
get_setting_int,
set_setting_string,
get_addon,
log_fn,
) -> None:
"""Initialisiert Callbacks fuer Settings-Zugriff."""
global _get_setting_string, _get_setting_bool, _get_setting_int
global _set_setting_string, _get_addon, _log_fn
_get_setting_string = get_setting_string
_get_setting_bool = get_setting_bool
_get_setting_int = get_setting_int
_set_setting_string = set_setting_string
_get_addon = get_addon
_log_fn = log_fn
def _log(message: str, level: int = 1) -> None:
if _log_fn is not None:
_log_fn(message, level)
# ---------------------------------------------------------------------------
# URL-Normalisierung
# ---------------------------------------------------------------------------
def normalize_update_info_url(raw: str) -> str:
value = str(raw or "").strip()
default = "http://127.0.0.1:8080/repo/addons.xml"
if not value:
return default
if value.endswith("/addons.xml"):
return value
return value.rstrip("/") + "/addons.xml"
# ---------------------------------------------------------------------------
# Update-Kanaele
# ---------------------------------------------------------------------------
def selected_update_channel() -> int:
channel = _get_setting_int("update_channel", default=UPDATE_CHANNEL_MAIN)
if channel not in {UPDATE_CHANNEL_MAIN, UPDATE_CHANNEL_NIGHTLY, UPDATE_CHANNEL_CUSTOM, UPDATE_CHANNEL_DEV}:
return UPDATE_CHANNEL_MAIN
return channel
def channel_label(channel: int) -> str:
if channel == UPDATE_CHANNEL_NIGHTLY:
return "Nightly"
if channel == UPDATE_CHANNEL_DEV:
return "Dev"
if channel == UPDATE_CHANNEL_CUSTOM:
return "Custom"
return "Main"
# ---------------------------------------------------------------------------
# Versionierung
# ---------------------------------------------------------------------------
def version_sort_key(version: str) -> tuple[int, ...]:
base = str(version or "").split("-", 1)[0]
parts = []
for chunk in base.split("."):
try:
parts.append(int(chunk))
except Exception:
parts.append(0)
while len(parts) < 4:
parts.append(0)
return tuple(parts[:4])
def is_stable_version(version: str) -> bool:
return bool(re.match(r"^\d+\.\d+\.\d+$", str(version or "").strip()))
def is_nightly_version(version: str) -> bool:
return bool(re.match(r"^\d+\.\d+\.\d+-nightly$", str(version or "").strip()))
def is_dev_version(version: str) -> bool:
return bool(re.match(r"^\d+\.\d+\.\d+-dev$", str(version or "").strip()))
def filter_versions_for_channel(channel: int, versions: list[str]) -> list[str]:
if channel == UPDATE_CHANNEL_MAIN:
return [v for v in versions if is_stable_version(v)]
if channel == UPDATE_CHANNEL_NIGHTLY:
return [v for v in versions if is_nightly_version(v)]
if channel == UPDATE_CHANNEL_DEV:
return [v for v in versions if is_dev_version(v)]
return list(versions)
# ---------------------------------------------------------------------------
# HTTP-Helfer
# ---------------------------------------------------------------------------
def read_text_url(url: str, *, timeout: int = UPDATE_HTTP_TIMEOUT_SEC) -> str:
request = Request(url, headers={"User-Agent": "ViewIT/1.0"})
response = None
try:
response = urlopen(request, timeout=timeout)
data = response.read()
finally:
if response is not None:
try:
response.close()
except Exception:
pass
return data.decode("utf-8", errors="replace")
def read_binary_url(url: str, *, timeout: int = UPDATE_HTTP_TIMEOUT_SEC) -> bytes:
request = Request(url, headers={"User-Agent": "ViewIT/1.0"})
response = None
try:
response = urlopen(request, timeout=timeout)
return response.read()
finally:
if response is not None:
try:
response.close()
except Exception:
pass
# ---------------------------------------------------------------------------
# Repo-Abfragen
# ---------------------------------------------------------------------------
def extract_repo_addon_version(xml_text: str, addon_id: str = UPDATE_ADDON_ID) -> str:
try:
root = ET.fromstring(xml_text)
except Exception:
return "-"
if root.tag == "addon":
return str(root.attrib.get("version") or "-")
for node in root.findall("addon"):
if str(node.attrib.get("id") or "").strip() == addon_id:
version = str(node.attrib.get("version") or "").strip()
return version or "-"
return "-"
def fetch_repo_addon_version(info_url: str) -> str:
url = normalize_update_info_url(info_url)
try:
xml_text = read_text_url(url)
except URLError:
return "-"
except Exception:
return "-"
return extract_repo_addon_version(xml_text)
def _extract_repo_identity(info_url: str) -> tuple[str, str, str, str] | None:
from urllib.parse import urlparse
parsed = urlparse(str(info_url or "").strip())
parts = [part for part in parsed.path.split("/") if part]
try:
raw_idx = parts.index("raw")
except ValueError:
return None
if raw_idx < 2 or (raw_idx + 2) >= len(parts):
return None
if parts[raw_idx + 1] != "branch":
return None
owner = parts[raw_idx - 2]
repo = parts[raw_idx - 1]
branch = parts[raw_idx + 2]
scheme = parsed.scheme or "https"
host = parsed.netloc
if not owner or not repo or not branch or not host:
return None
return scheme, host, owner, repo + "|" + branch
def fetch_repo_versions(info_url: str) -> list[str]:
identity = _extract_repo_identity(info_url)
if identity is None:
one = fetch_repo_addon_version(info_url)
return [one] if one != "-" else []
scheme, host, owner, repo_branch = identity
repo, branch = repo_branch.split("|", 1)
api_url = f"{scheme}://{host}/api/v1/repos/{owner}/{repo}/contents/{UPDATE_ADDON_ID}?ref={branch}"
try:
payload = read_text_url(api_url)
data = json.loads(payload)
except Exception:
one = fetch_repo_addon_version(info_url)
return [one] if one != "-" else []
versions: list[str] = []
if isinstance(data, list):
for entry in data:
if not isinstance(entry, dict):
continue
name = str(entry.get("name") or "")
match = re.match(rf"^{re.escape(UPDATE_ADDON_ID)}-(.+)\.zip$", name)
if not match:
continue
version = match.group(1).strip()
if version:
versions.append(version)
unique = sorted(set(versions), key=version_sort_key, reverse=True)
return unique
# ---------------------------------------------------------------------------
# Changelog
# ---------------------------------------------------------------------------
def extract_changelog_section(changelog_text: str, version: str) -> str:
lines = changelog_text.splitlines()
wanted = (version or "").strip()
if not wanted:
return "\n".join(lines[:120]).strip()
start = -1
for idx, line in enumerate(lines):
if line.startswith("## ") and wanted in line:
start = idx
break
if start < 0:
return f"Kein Changelog-Abschnitt fuer Version {wanted} gefunden."
end = len(lines)
for idx in range(start + 1, len(lines)):
if lines[idx].startswith("## "):
end = idx
break
return "\n".join(lines[start:end]).strip()
def fetch_changelog_for_channel(channel: int, version: str) -> str:
version_text = str(version or "").strip().casefold()
if version_text.endswith("-dev"):
url = "https://gitea.it-drui.de/viewit/ViewIT/raw/branch/dev/CHANGELOG-DEV.md"
elif version_text.endswith("-nightly"):
url = "https://gitea.it-drui.de/viewit/ViewIT/raw/branch/nightly/CHANGELOG-NIGHTLY.md"
elif channel == UPDATE_CHANNEL_DEV:
url = "https://gitea.it-drui.de/viewit/ViewIT/raw/branch/dev/CHANGELOG-DEV.md"
elif channel == UPDATE_CHANNEL_MAIN:
url = "https://gitea.it-drui.de/viewit/ViewIT/raw/branch/main/CHANGELOG.md"
else:
url = "https://gitea.it-drui.de/viewit/ViewIT/raw/branch/nightly/CHANGELOG-NIGHTLY.md"
try:
text = read_text_url(url)
except Exception:
return "Changelog konnte nicht geladen werden."
return extract_changelog_section(text, version)
# ---------------------------------------------------------------------------
# Installation
# ---------------------------------------------------------------------------
def install_addon_version_manual(info_url: str, version: str) -> bool:
base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/")
zip_url = f"{base}/{UPDATE_ADDON_ID}/{UPDATE_ADDON_ID}-{version}.zip"
try:
zip_bytes = read_binary_url(zip_url)
except Exception as exc:
_log(f"Download fehlgeschlagen ({zip_url}): {exc}", 2)
return False
if xbmcvfs is None:
return False
addons_root = xbmcvfs.translatePath("special://home/addons")
addons_root_real = os.path.realpath(addons_root)
try:
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as archive:
for member in archive.infolist():
name = str(member.filename or "")
if not name or name.endswith("/"):
continue
target = os.path.realpath(os.path.join(addons_root, name))
if not target.startswith(addons_root_real + os.sep):
_log(f"Sicherheitswarnung: Verdaechtiger ZIP-Eintrag abgelehnt: {name!r}", 2)
return False
os.makedirs(os.path.dirname(target), exist_ok=True)
with archive.open(member, "r") as src, open(target, "wb") as dst:
dst.write(src.read())
except Exception as exc:
_log(f"Entpacken fehlgeschlagen: {exc}", 2)
return False
builtin = getattr(xbmc, "executebuiltin", None) if xbmc else None
if callable(builtin):
builtin("UpdateLocalAddons")
return True
def install_addon_version(info_url: str, version: str) -> bool:
base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/")
zip_url = f"{base}/{UPDATE_ADDON_ID}/{UPDATE_ADDON_ID}-{version}.zip"
builtin = getattr(xbmc, "executebuiltin", None) if xbmc else None
if callable(builtin):
try:
before = installed_addon_version_from_disk()
builtin(f"InstallAddon({zip_url})")
for _ in range(20):
time.sleep(1)
current = installed_addon_version_from_disk()
if current == version:
return True
if before == version:
return True
except Exception as exc:
_log(f"InstallAddon fehlgeschlagen, fallback aktiv: {exc}", 2)
return install_addon_version_manual(info_url, version)
# ---------------------------------------------------------------------------
# Installierte Version / Addon-Pruefung
# ---------------------------------------------------------------------------
def installed_addon_version_from_disk() -> str:
if xbmcvfs is None:
return "0.0.0"
try:
addon_xml = xbmcvfs.translatePath(f"special://home/addons/{UPDATE_ADDON_ID}/addon.xml")
except Exception:
return "0.0.0"
if not addon_xml or not os.path.exists(addon_xml):
return "0.0.0"
try:
root = ET.parse(addon_xml).getroot()
version = str(root.attrib.get("version") or "").strip()
return version or "0.0.0"
except Exception:
return "0.0.0"
def is_addon_installed(addon_id: str) -> bool:
addon_id = str(addon_id or "").strip()
if not addon_id:
return False
has_addon = getattr(xbmc, "getCondVisibility", None) if xbmc else None
if callable(has_addon):
try:
return bool(has_addon(f"System.HasAddon({addon_id})"))
except Exception:
pass
if xbmcvfs is None:
return False
try:
addon_xml = xbmcvfs.translatePath(f"special://home/addons/{addon_id}/addon.xml")
except Exception:
return False
return bool(addon_xml and os.path.exists(addon_xml))
# ---------------------------------------------------------------------------
# Repository-Quellen-Verwaltung
# ---------------------------------------------------------------------------
def repo_addon_xml_path() -> str:
if xbmcvfs is None:
return ""
try:
return xbmcvfs.translatePath("special://home/addons/repository.viewit/addon.xml")
except Exception:
return ""
def update_repository_source(info_url: str) -> bool:
path = repo_addon_xml_path()
if not path:
return False
if not os.path.exists(path):
return False
try:
tree = ET.parse(path)
root = tree.getroot()
dir_node = root.find(".//dir")
if dir_node is None:
return False
info = dir_node.find("info")
checksum = dir_node.find("checksum")
datadir = dir_node.find("datadir")
if info is None or checksum is None or datadir is None:
return False
base = info_url[: -len("/addons.xml")] if info_url.endswith("/addons.xml") else info_url.rstrip("/")
info.text = info_url
checksum.text = f"{base}/addons.xml.md5"
datadir.text = f"{base}/"
tree.write(path, encoding="utf-8", xml_declaration=True)
return True
except Exception as exc:
_log(f"Repository-URL konnte nicht gesetzt werden: {exc}", 2)
return False
# ---------------------------------------------------------------------------
# ResolveURL
# ---------------------------------------------------------------------------
def sync_resolveurl_status_setting() -> None:
status = "Installiert" if is_addon_installed(RESOLVEURL_ADDON_ID) else "Fehlt"
_set_setting_string("resolveurl_status", status)
def install_kodi_addon(addon_id: str, *, wait_seconds: int) -> bool:
if is_addon_installed(addon_id):
return True
builtin = getattr(xbmc, "executebuiltin", None) if xbmc else None
if not callable(builtin):
return False
try:
builtin(f"InstallAddon({addon_id})")
builtin("UpdateLocalAddons")
except Exception as exc:
_log(f"InstallAddon fehlgeschlagen ({addon_id}): {exc}", 2)
return False
if wait_seconds <= 0:
return is_addon_installed(addon_id)
deadline = time.time() + max(1, int(wait_seconds))
while time.time() < deadline:
if is_addon_installed(addon_id):
return True
time.sleep(1)
return is_addon_installed(addon_id)
def ensure_resolveurl_installed(*, force: bool, silent: bool) -> bool:
if is_addon_installed(RESOLVEURL_ADDON_ID):
sync_resolveurl_status_setting()
return True
if not force and not _get_setting_bool("resolveurl_auto_install", default=True):
sync_resolveurl_status_setting()
return False
now = int(time.time())
if not force:
last_try = _get_setting_int("resolveurl_last_ts", default=0)
if last_try > 0 and (now - last_try) < RESOLVEURL_AUTO_INSTALL_INTERVAL_SEC:
return False
_set_setting_string("resolveurl_last_ts", str(now))
wait_seconds = 20 if force else 0
ok = install_kodi_addon(RESOLVEURL_ADDON_ID, wait_seconds=wait_seconds)
sync_resolveurl_status_setting()
if not silent and xbmcgui is not None:
if ok:
xbmcgui.Dialog().notification(
"ResolveURL",
"script.module.resolveurl ist installiert.",
xbmcgui.NOTIFICATION_INFO,
4000,
)
else:
xbmcgui.Dialog().notification(
"ResolveURL",
"Installation fehlgeschlagen. Bitte Repository/Netzwerk pruefen.",
xbmcgui.NOTIFICATION_ERROR,
5000,
)
return ok
def maybe_auto_install_resolveurl(action: str | None) -> None:
if (action or "").strip():
return
ensure_resolveurl_installed(force=False, silent=True)
# ---------------------------------------------------------------------------
# Update-Kanal anwenden / Sync
# ---------------------------------------------------------------------------
def resolve_update_info_url() -> str:
channel = selected_update_channel()
if channel == UPDATE_CHANNEL_NIGHTLY:
raw = _get_setting_string("update_repo_url_nightly")
elif channel == UPDATE_CHANNEL_DEV:
raw = _get_setting_string("update_repo_url_dev")
elif channel == UPDATE_CHANNEL_CUSTOM:
raw = _get_setting_string("update_repo_url")
else:
raw = _get_setting_string("update_repo_url_main")
return normalize_update_info_url(raw)
def sync_update_channel_status_settings() -> None:
channel = selected_update_channel()
selected_info_url = resolve_update_info_url()
available_selected = fetch_repo_addon_version(selected_info_url)
_set_setting_string("update_active_channel", channel_label(channel))
_set_setting_string("update_active_repo_url", selected_info_url)
_set_setting_string("update_available_selected", available_selected)
def sync_update_version_settings() -> None:
addon_version = installed_addon_version_from_disk()
if addon_version == "0.0.0":
addon = _get_addon()
if addon is not None:
try:
addon_version = str(addon.getAddonInfo("version") or "0.0.0")
except Exception:
addon_version = "0.0.0"
_set_setting_string("update_installed_version", addon_version)
sync_resolveurl_status_setting()
sync_update_channel_status_settings()
def apply_update_channel(*, silent: bool = False) -> bool:
if xbmc is None: # pragma: no cover - outside Kodi
return False
info_url = resolve_update_info_url()
channel = selected_update_channel()
sync_update_version_settings()
applied = update_repository_source(info_url)
installed_version = _get_setting_string("update_installed_version").strip() or "0.0.0"
versions = filter_versions_for_channel(channel, fetch_repo_versions(info_url))
target_version = versions[0] if versions else "-"
install_result = False
if target_version != "-" and target_version != installed_version:
install_result = install_addon_version(info_url, target_version)
elif target_version == installed_version:
install_result = True
builtin = getattr(xbmc, "executebuiltin", None)
if callable(builtin):
builtin("UpdateAddonRepos")
builtin("UpdateLocalAddons")
if not silent:
if not applied:
warning_icon = getattr(xbmcgui, "NOTIFICATION_WARNING", xbmcgui.NOTIFICATION_INFO)
show_notification(
"Updates",
"Kanal gespeichert, aber repository.viewit nicht gefunden.",
icon=warning_icon,
milliseconds=5000,
)
elif target_version == "-":
show_error("Updates", "Kanal angewendet, aber keine Version im Kanal gefunden.", milliseconds=5000)
elif not install_result:
show_error(
"Updates",
f"Kanal angewendet, Installation von {target_version} fehlgeschlagen.",
milliseconds=5000,
)
elif target_version == installed_version:
show_notification(
"Updates",
f"Kanal angewendet: {channel_label(selected_update_channel())} ({target_version} bereits installiert)",
milliseconds=4500,
)
else:
show_notification(
"Updates",
f"Kanal angewendet: {channel_label(selected_update_channel())} -> {target_version} installiert",
milliseconds=5000,
)
sync_update_version_settings()
return applied and install_result
def run_update_check(*, silent: bool = False) -> None:
"""Stoesst Kodi-Repo- und Addon-Updates an."""
if xbmc is None: # pragma: no cover - outside Kodi
return
try:
apply_update_channel(silent=True)
if not silent:
builtin = getattr(xbmc, "executebuiltin", None)
if callable(builtin):
builtin("ActivateWindow(addonbrowser,addons://updates/)")
if not silent:
show_notification("Updates", "Update-Check gestartet.", milliseconds=4000)
except Exception as exc:
_log(f"Update-Pruefung fehlgeschlagen: {exc}", 2)
if not silent:
show_error("Updates", "Update-Check fehlgeschlagen.", milliseconds=4000)
def show_version_selector() -> None:
if xbmc is None: # pragma: no cover - outside Kodi
return
info_url = resolve_update_info_url()
channel = selected_update_channel()
sync_update_version_settings()
versions = filter_versions_for_channel(channel, fetch_repo_versions(info_url))
if not versions:
show_error("Updates", "Keine Versionen im Repo gefunden.", milliseconds=4000)
return
installed = _get_setting_string("update_installed_version").strip() or "-"
options = []
for version in versions:
label = version
if version == installed:
label = f"{version} (installiert)"
options.append(label)
selected = xbmcgui.Dialog().select("Version waehlen", options)
if selected < 0 or selected >= len(versions):
return
version = versions[selected]
changelog = fetch_changelog_for_channel(channel, version)
viewer = getattr(xbmcgui.Dialog(), "textviewer", None)
if callable(viewer):
try:
viewer(f"Changelog {version}", changelog)
except Exception:
pass
action = xbmcgui.Dialog().select(
f"Version {version} installieren?",
["Update installieren", "Abbrechen"],
)
if action != 0:
return
show_notification("Updates", f"Installation gestartet: {version}", milliseconds=2500)
ok = install_addon_version(info_url, version)
if ok:
sync_update_version_settings()
show_notification("Updates", f"Version {version} installiert.", milliseconds=4000)
else:
show_error("Updates", f"Installation von {version} fehlgeschlagen.", milliseconds=4500)
def maybe_run_auto_update_check(action: str | None) -> None:
action = (action or "").strip()
if action:
return
if not _get_setting_bool("auto_update_enabled", default=False):
return
now = int(time.time())
last = _get_setting_int("auto_update_last_ts", default=0)
if last > 0 and (now - last) < AUTO_UPDATE_INTERVAL_SEC:
return
_set_setting_string("auto_update_last_ts", str(now))
run_update_check(silent=True)

File diff suppressed because it is too large Load Diff

29
addon/genre_utils.py Normal file
View File

@@ -0,0 +1,29 @@
from __future__ import annotations
import re
from html import unescape
def normalize_genre_label(raw: str) -> str:
"""Normalisiert Genre-Bezeichner aus HTML-Labels oder Datenattributen."""
text = unescape(re.sub(r"\s+", " ", str(raw or ""))).strip()
if not text:
return ""
key_prefix = "filter.genre_"
if text.casefold().startswith(key_prefix):
slug = text[len(key_prefix) :].strip().casefold()
slug = slug.replace("_", "-")
slug = re.sub(r"[^a-z0-9-]+", "-", slug).strip("-")
if not slug:
return ""
special = {
"doku-soap": "Doku-Soap",
"scifi": "SciFi",
"fighting-shounen": "Fighting-Shounen",
}
if slug in special:
return special[slug]
return " ".join(chunk.capitalize() for chunk in slug.split("-") if chunk)
return text

View File

@@ -96,6 +96,35 @@ def notify_url(
return
def show_notification(
heading: str,
message: str,
*,
icon: int | None = None,
milliseconds: int = 3000,
) -> None:
"""Zeigt eine kurze Kodi-Notification an (falls `xbmcgui` verfuegbar ist)."""
if xbmcgui is None:
return
try:
icon_value = icon if icon is not None else xbmcgui.NOTIFICATION_INFO
xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), icon_value, int(milliseconds))
except Exception:
return
def show_error(heading: str, message: str, *, milliseconds: int = 4000) -> None:
"""Zeigt eine einheitliche Fehlermeldung im Kodi-UI."""
if xbmcgui is None:
return
try:
xbmcgui.Dialog().notification(str(heading or ""), str(message or ""), xbmcgui.NOTIFICATION_ERROR, int(milliseconds))
except Exception:
return
def _profile_logs_dir(addon_id: str) -> Optional[str]:
if xbmcaddon is None or xbmcvfs is None:
return None
@@ -241,6 +270,27 @@ def dump_response_html(
_append_text_file(path, content)
def resolve_via_resolveurl(link: str, *, fallback_to_link: bool = True) -> Optional[str]:
"""Versucht einen Hoster-Link mit resolveurl_backend aufzuloesen.
Gibt den aufgeloesten Link zurueck, oder wenn resolveurl nicht verfuegbar
ist oder nichts liefert den Original-Link (wenn fallback_to_link=True)
bzw. None (wenn fallback_to_link=False).
"""
link = (link or "").strip()
if not link:
return None
try:
from resolveurl_backend import resolve as _resolve_fn # type: ignore[import-not-found]
except Exception:
_resolve_fn = None
if callable(_resolve_fn):
resolved = _resolve_fn(link)
if resolved:
return resolved
return link if fallback_to_link else None
def normalize_resolved_stream_url(final_url: str, *, source_url: str = "") -> str:
"""Normalisiert hoster-spezifische Header im finalen Stream-Link.

View File

@@ -53,8 +53,14 @@ class BasisPlugin(ABC):
def capabilities(self) -> Set[str]:
"""Optional: Liefert eine Menge an Features/Capabilities dieses Plugins.
Beispiele:
- `popular_series`: Plugin kann eine Liste beliebter Serien liefern.
Bekannte Werte:
- 'popular_series' Plugin hat beliebte Serien/Filme
- 'latest_titles' Plugin hat neu hinzugefuegte Titel
- 'year_filter' Plugin unterstuetzt Jahr-Filter
- 'country_filter' Plugin unterstuetzt Land-Filter
- 'collections' Plugin hat Sammlungen/Filmreihen
- 'tags' Plugin hat Tag/Schlagwort-Suche
- 'random' Plugin kann einen zufaelligen Titel liefern
"""
return set()
@@ -63,3 +69,85 @@ class BasisPlugin(ABC):
"""Optional: Liefert eine Liste beliebter Serien (als Titel-Strings)."""
return []
# ------------------------------------------------------------------
# Neue Felder fuer "Neue Titel"-Menü
# ------------------------------------------------------------------
def latest_titles(self, page: int = 1) -> List[str]:
"""Optional: Liefert neu hinzugefuegte Titel (Filme oder Serien).
Capability: 'latest_titles'
"""
return []
# ------------------------------------------------------------------
# Jahr-Filter
# ------------------------------------------------------------------
def years_available(self) -> List[str]:
"""Optional: Liefert verfuegbare Erscheinungsjahre (z.B. ['2026', '2025', ...]).
Capability: 'year_filter'
"""
return []
def titles_for_year(self, year: str, page: int = 1) -> List[str]:
"""Optional: Liefert Titel fuer ein bestimmtes Erscheinungsjahr."""
return []
# ------------------------------------------------------------------
# Land-Filter
# ------------------------------------------------------------------
def countries_available(self) -> List[str]:
"""Optional: Liefert verfuegbare Produktionslaender.
Capability: 'country_filter'
"""
return []
def titles_for_country(self, country: str, page: int = 1) -> List[str]:
"""Optional: Liefert Titel fuer ein bestimmtes Produktionsland."""
return []
# ------------------------------------------------------------------
# Sammlungen / Collections
# ------------------------------------------------------------------
def collections(self) -> List[str]:
"""Optional: Liefert verfuegbare Sammlungen/Filmreihen.
Capability: 'collections'
"""
return []
def titles_for_collection(self, collection: str, page: int = 1) -> List[str]:
"""Optional: Liefert Titel einer Sammlung/Filmreihe."""
return []
# ------------------------------------------------------------------
# Tags / Schlagworte
# ------------------------------------------------------------------
def tags(self) -> List[str]:
"""Optional: Liefert verfuegbare Schlagworte/Tags.
Capability: 'tags'
"""
return []
def titles_for_tag(self, tag: str, page: int = 1) -> List[str]:
"""Optional: Liefert Titel zu einem Schlagwort/Tag."""
return []
# ------------------------------------------------------------------
# Zufaelliger Titel
# ------------------------------------------------------------------
def random_title(self) -> Optional[str]:
"""Optional: Liefert einen zufaelligen Titel.
Capability: 'random'
"""
return None

View File

@@ -1 +1,2 @@
"""Kodi addon plugins."""
from __future__ import annotations

View File

@@ -1,9 +1,25 @@
"""Template fuer ein neues ViewIt-Plugin (Basis: serienstream_plugin).
"""Template fuer ein neues ViewIt-Plugin.
Diese Datei wird NICHT automatisch geladen (Dateiname beginnt mit `_`).
Zum Verwenden:
1) Kopiere/benenne die Datei um (ohne fuehrenden Unterstrich), z.B. `my_site_plugin.py`
2) Passe `name`, `BASE_URL` und die Implementierungen an.
Vorgehen fuer ein neues Plugin:
1. Datei kopieren/umbenennen (ohne fuehrenden Unterstrich), z.B. `my_site_plugin.py`
2. `name`, `ADDON_ID`, `BASE_URL` und Header anpassen
3. `search_titles`, `seasons_for`, `episodes_for` gemaess Zielseite implementieren
4. Optional weitere Methoden implementieren capabilities deklarieren und Methoden ueberschreiben:
- `popular_series()` + capability 'popular_series'
- `new_titles()` + `new_titles_page(page)` + capability 'new_titles'
- `genres()` + `titles_for_genre(genre)` + `titles_for_genre_page(genre, page)`
- `alpha_index()` + `titles_for_alpha_page(letter, page)`
- `years_available()` + `titles_for_year(year, page)` + capability 'year_filter'
- `countries_available()` + `titles_for_country(country, page)` + capability 'country_filter'
- `collections()` + `titles_for_collection(collection, page)` + capability 'collections'
- `tags()` + `titles_for_tag(tag, page)` + capability 'tags'
- `random_title()` + capability 'random'
- `stream_link_for(...)`, `resolve_stream_link(link)`, `available_hosters_for(...)`
- `metadata_for(title)` fuer eigene Metadaten
Siehe `docs/PLUGIN_DEVELOPMENT.md` und bestehende Plugins.
"""
from __future__ import annotations
@@ -48,20 +64,33 @@ HEADERS = {
"Connection": "keep-alive",
}
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
@dataclass(frozen=True)
class TitleHit:
"""Ein Suchtreffer mit Titel und Detail-URL."""
"""Ein einfacher Suchtreffer mit Titel und Detail-URL."""
title: str
url: str
class TemplatePlugin(BasisPlugin):
"""Vorlage fuer eine Streamingseiten-Integration.
"""Vorlage fuer eine HTML-basierte Streamingseiten-Integration.
Optional kann ein Plugin Capabilities deklarieren (z.B. `popular_series`),
damit der Router passende Menüpunkte anbieten kann.
Dieses Template zeigt nur die MINIMALE, aber reale Schnittstelle:
Pflicht:
- `async search_titles(query, progress_callback=None) -> list[str]`
- `seasons_for(title) -> list[str]`
- `episodes_for(title, season) -> list[str]`
Empfohlen (optional, je nach Use-Case):
- `capabilities()` mit z.B. `popular_series`, `genres`, `latest_episodes`
- `popular_series()`, `titles_for_genre()`, `titles_for_genre_page()`
- `stream_link_for(...)` und/oder `stream_link_for_url(...)`
- `resolve_stream_link(link)` fuer Hosters/Redirects
- `metadata_for(title)` fuer eigene Metadaten (siehe bestehende Plugins)
"""
name = "Template"
@@ -71,15 +100,25 @@ class TemplatePlugin(BasisPlugin):
@property
def is_available(self) -> bool:
"""Signalisiert dem Router, ob das Plugin nutzbar ist (z.B. Abhaengigkeiten vorhanden)."""
return REQUESTS_AVAILABLE
@property
def unavailable_reason(self) -> str:
"""Optionaler Grund, warum `is_available` false ist (z.B. fehlende Pakete)."""
if REQUESTS_AVAILABLE:
return ""
return f"requests/bs4 nicht verfuegbar: {REQUESTS_IMPORT_ERROR}"
def _get_session(self) -> RequestsSession:
"""Gibt eine vorkonfigurierte `requests.Session` zurueck.
In echten Plugins kann hier auch `http_session_pool.get_requests_session(...)`
genutzt werden, wenn mehrere Module sich Sessions teilen sollen.
"""
if requests is None:
raise RuntimeError(self.unavailable_reason)
if self._session is None:
@@ -91,41 +130,79 @@ class TemplatePlugin(BasisPlugin):
async def search_titles(
self,
query: str,
progress_callback: Optional[Callable[[str, Optional[int]], Any]] = None,
progress_callback: ProgressCallback = None,
) -> List[str]:
"""TODO: Suche auf der Zielseite implementieren."""
"""Sucht Titel auf der Zielseite und liefert eine Liste an Titel-Strings.
Best Practices:
- Nur passende Titel liefern (wortbasiert, keine Zufallstreffer).
- `progress_callback(message, percent)` sparsam nutzen, um lange Suchen anzuzeigen.
- HTTP-Requests robust kapseln (Timeouts, Fehlerbehandlung, optionales Logging).
"""
_ = (query, progress_callback)
return []
def seasons_for(self, title: str) -> List[str]:
"""TODO: Staffeln fuer einen Titel liefern."""
"""Liefert alle Staffeln fuer einen Titel, z.B. `['Staffel 1', 'Staffel 2']`.
Fuer reine Film-Provider kann stattdessen z.B. `['Film']` zurueckgegeben werden
(siehe \"Film Provider Standard\" in `docs/PLUGIN_DEVELOPMENT.md`).
"""
_ = title
return []
def episodes_for(self, title: str, season: str) -> List[str]:
"""TODO: Episoden fuer Titel+Staffel liefern."""
"""Liefert Episoden-Labels fuer einen Titel und eine Staffel.
Beispiele:
- `['Episode 1', 'Episode 2']`
- `['Episode 1: Pilot', 'Episode 2: Finale']`
"""
_ = (title, season)
return []
def capabilities(self) -> set[str]:
"""Optional: Deklariert higkeiten dieses Plugins.
"""Optional: Deklariert die Faehigkeiten dieses Plugins.
Beispiele:
- `popular_series`: Plugin kann beliebte Serien liefern
- `genres`: Plugin unterstützt Genre-Browser
Bekannte Werte (aus plugin_interface.py):
- 'popular_series' Plugin hat beliebte Serien/Filme
- 'new_titles' Plugin hat neu hinzugefuegte Titel
- 'year_filter' Plugin unterstuetzt Jahr-Filter
- 'country_filter' Plugin unterstuetzt Land-Filter
- 'collections' Plugin hat Sammlungen/Filmreihen
- 'tags' Plugin hat Tag/Schlagwort-Suche
- 'random' Plugin kann einen zufaelligen Titel liefern
- 'genres' Plugin hat Genre-Browser
- 'alpha' Plugin hat A-Z-Index
- 'latest_episodes' Plugin liefert neue Episoden
"""
return set()
def popular_series(self) -> List[str]:
"""Optional: Liste beliebter Serien (nur wenn `popular_series` gesetzt ist)."""
"""Optional: Liste beliebter Titel (wenn `popular_series` in `capabilities()` gesetzt ist)."""
return []
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
"""Optional: Embed-/Hoster-Link fuer eine Episode."""
"""Optional: Embed-/Hoster-Link fuer eine Episode.
Der Router ruft diese Methode nur auf, wenn sie existiert. Der Rueckgabewert
ist entweder ein finaler Stream-Link oder ein Hoster-/Embed-Link, der spaeter
ueber `resolve_stream_link` oder ResolveURL weiter aufgeloest werden kann.
"""
_ = (title, season, episode)
return None
def resolve_stream_link(self, link: str) -> Optional[str]:
"""Optional: Redirect-/Mirror-Aufloesung."""
"""Optional: Redirect-/Mirror-Aufloesung fuer Hoster-Links.
Falls nicht ueberschrieben, kann der Router (oder ResolveURL) den Link
direkt verwenden. Plugins koennen hier z.B. HTTP-Redirects verfolgen.
"""
return link

View File

@@ -39,6 +39,8 @@ from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from http_session_pool import get_requests_session
from regex_patterns import DIGITS, SEASON_EPISODE_TAG, SEASON_EPISODE_URL, STAFFEL_NUM_IN_URL
from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text
from genre_utils import normalize_genre_label as _normalize_genre_label
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
@@ -257,10 +259,7 @@ def _log_error(message: str) -> None:
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
return _shared_normalize_search_text(value)
def _strip_html(text: str) -> str:
@@ -270,11 +269,7 @@ def _strip_html(text: str) -> str:
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
return _shared_matches_query(query, title=title)
def _ensure_requests() -> None:
@@ -366,7 +361,7 @@ def _extract_genre_names_from_html(body: str) -> List[str]:
)
for match in pattern.finditer(body or ""):
text = re.sub(r"<[^>]+>", " ", match.group(1) or "")
text = unescape(re.sub(r"\s+", " ", text)).strip()
text = _normalize_genre_label(text)
if not text:
continue
key = text.casefold()
@@ -598,6 +593,7 @@ def resolve_redirect(target_url: str) -> Optional[str]:
response = None
try:
response = session.get(normalized_url, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
response.raise_for_status()
if response.url:
_log_url(response.url, kind="RESOLVED")
return response.url if response.url else None
@@ -1029,7 +1025,7 @@ class AniworldPlugin(BasisPlugin):
_session_cache_set(self._season_episodes_cache_name(season_url), payload)
def capabilities(self) -> set[str]:
return {"popular_series", "genres", "latest_episodes"}
return {"popular_series", "genres", "latest_episodes", "new_titles"}
def _find_series_by_title(self, title: str) -> Optional[SeriesResult]:
title = (title or "").strip()
@@ -1193,7 +1189,7 @@ class AniworldPlugin(BasisPlugin):
genre_blocks = soup.select("div.genre")
for genre_block in genre_blocks:
name_node = genre_block.select_one(".seriesGenreList h3")
genre_name = (name_node.get_text(" ", strip=True) if name_node else "").strip()
genre_name = _normalize_genre_label(name_node.get_text(" ", strip=True) if name_node else "")
if not genre_name:
continue
entries: List[SeriesResult] = []
@@ -1235,7 +1231,11 @@ class AniworldPlugin(BasisPlugin):
return list(self._genre_names_cache)
cached = _session_cache_get("genres")
if isinstance(cached, list):
names = [str(value).strip() for value in cached if str(value).strip()]
names: List[str] = []
for value in cached:
normalized = _normalize_genre_label(value)
if normalized:
names.append(normalized)
if names:
self._genre_names_cache = sorted(set(names), key=str.casefold)
return list(self._genre_names_cache)
@@ -1278,6 +1278,103 @@ class AniworldPlugin(BasisPlugin):
self._save_title_url_cache()
return [entry.title for entry in entries if entry.title]
def _genre_slug(self, genre: str) -> str:
"""Wandelt einen Genre-Namen in einen URL-Slug um."""
slug = (genre or "").strip().lower()
slug = re.sub(r"[^a-z0-9]+", "-", slug).strip("-")
return slug
def _genre_page_url(self, genre: str, page: int) -> str:
slug = self._genre_slug(genre)
base = f"{_get_base_url()}/genre/{slug}"
return base if page <= 1 else f"{base}?page={page}"
def _parse_genre_page_titles(self, soup: BeautifulSoupT) -> List[str]:
"""Extrahiert Titel von einer paginierten Genre-Seite."""
titles: List[str] = []
seen: set[str] = set()
for anchor in soup.select("div.seriesListContainer a[href], ul.seriesList li a[href], a[href*='/anime/stream/']"):
href = (anchor.get("href") or "").strip()
if not href or "/staffel-" in href or "/episode-" in href:
continue
title = (anchor.get_text(" ", strip=True) or "").strip()
if not title:
continue
key = title.casefold()
if key in seen:
continue
seen.add(key)
url = _absolute_url(href)
self._remember_anime_result(title, url, persist=False)
titles.append(title)
return titles
def _extract_genre_last_page(self, soup: BeautifulSoupT) -> int:
max_page = 1
for anchor in soup.select("a.page-link[href], nav a[href]"):
href = (anchor.get("href") or "").strip()
for match in re.findall(r"[?&]page=(\d+)", href):
try:
max_page = max(max_page, int(match))
except Exception:
continue
return max_page
def titles_for_genre_page(self, genre: str, page: int = 1) -> List[str]:
"""Liefert Titel einer Genre-Seite (paginiert)."""
genre = (genre or "").strip()
if not genre or not self._requests_available:
return []
page = max(1, int(page or 1))
try:
url = self._genre_page_url(genre, page)
soup = _get_soup_simple(url)
return self._parse_genre_page_titles(soup)
except Exception:
return []
def genre_page_count(self, genre: str) -> int:
"""Liefert die Seitenanzahl fuer eine Genre-Seite."""
genre = (genre or "").strip()
if not genre or not self._requests_available:
return 1
try:
url = self._genre_page_url(genre, 1)
soup = _get_soup_simple(url)
return max(1, self._extract_genre_last_page(soup))
except Exception:
return 1
def new_titles_page(self, page: int = 1) -> List[str]:
"""Liefert neu hinzugefuegte Anime vom Animekalender."""
if not self._requests_available:
return []
page = max(1, int(page or 1))
try:
url = f"{_get_base_url()}/animekalender"
if page > 1:
url = f"{url}?page={page}"
soup = _get_soup_simple(url)
titles: List[str] = []
seen: set[str] = set()
for anchor in soup.select("a[href*='/anime/stream/']"):
title = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if not title or "/staffel-" in href or "/episode-" in href:
continue
key = title.casefold()
if key in seen:
continue
seen.add(key)
self._remember_anime_result(title, _absolute_url(href), persist=False)
titles.append(title)
return titles
except Exception:
return []
def new_titles(self) -> List[str]:
return self.new_titles_page(1)
def _season_label(self, number: int) -> str:
return f"Staffel {number}"

View File

@@ -21,6 +21,7 @@ else:
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
@@ -35,6 +36,8 @@ ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "doku_streams_base_url"
DEFAULT_BASE_URL = "https://doku-streams.com"
MOST_VIEWED_PATH = "/meistgesehene/"
RANDOM_PATH = "/zufaellige-doku/"
TAGS_BASE_PATH = "/tag/"
DEFAULT_TIMEOUT = 20
GLOBAL_SETTING_LOG_URLS = "debug_log_urls"
GLOBAL_SETTING_DUMP_HTML = "debug_dump_html"
@@ -77,12 +80,12 @@ def _extract_last_page(soup: BeautifulSoupT) -> int:
for anchor in soup.select("nav.navigation a[href], nav.pagination a[href], a.page-numbers[href]"):
text = (anchor.get_text(" ", strip=True) or "").strip()
for candidate in (text, (anchor.get("href") or "").strip()):
for value in re.findall(r"/page/(\\d+)/", candidate):
for value in re.findall(r"/page/(\d+)/", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
continue
for value in re.findall(r"(\\d+)", candidate):
for value in re.findall(r"(\d+)", candidate):
try:
max_page = max(max_page, int(value))
except Exception:
@@ -161,18 +164,11 @@ def _absolute_url(url: str) -> str:
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
return _shared_normalize_search_text(value)
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
return _shared_matches_query(query, title=title)
def _log_url_event(url: str, *, kind: str = "VISIT") -> None:
@@ -293,7 +289,7 @@ class DokuStreamsPlugin(BasisPlugin):
return _parse_listing_hits(soup, query=query)
def capabilities(self) -> set[str]:
return {"genres", "popular_series"}
return {"genres", "popular_series", "tags", "random"}
def _categories_url(self) -> str:
return _absolute_url("/kategorien/")
@@ -308,7 +304,7 @@ class DokuStreamsPlugin(BasisPlugin):
def clean_name(value: str) -> str:
value = (value or "").strip()
return re.sub(r"\\s*\\(\\d+\\)\\s*$", "", value).strip()
return re.sub(r"\s*\(\d+\)\s*$", "", value).strip()
def walk(ul, parents: List[str]) -> None:
for li in ul.find_all("li", recursive=False):
@@ -471,6 +467,90 @@ class DokuStreamsPlugin(BasisPlugin):
return []
return [title]
def tags(self) -> List[str]:
"""Liefert Schlagworte/Tags von der Startseite."""
if not self._requests_available:
return []
try:
soup = _get_soup(_absolute_url("/"), session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
tag_list: list[str] = []
for anchor in soup.select("a[href*='/tag/']"):
name = (anchor.get_text(" ", strip=True) or "").strip()
href = (anchor.get("href") or "").strip()
if name and TAGS_BASE_PATH in href and name not in tag_list:
tag_list.append(name)
return sorted(tag_list, key=lambda t: t.casefold())
def titles_for_tag(self, tag: str, page: int = 1) -> List[str]:
"""Liefert Titel zu einem Schlagwort."""
tag = (tag or "").strip()
if not tag or not self._requests_available:
return []
page = max(1, int(page or 1))
slug = tag.lower().replace(" ", "-")
base = _absolute_url(f"{TAGS_BASE_PATH}{slug}/")
url = base if page == 1 else f"{base}page/{page}/"
try:
soup = _get_soup(url, session=get_requests_session("dokustreams", headers=HEADERS))
except Exception:
return []
hits = _parse_listing_hits(soup)
self._title_to_url.update({hit.title: hit.url for hit in hits if hit.title and hit.url})
for hit in hits:
if hit.title:
self._title_meta[hit.title] = (hit.plot, hit.poster)
return [hit.title for hit in hits if hit.title]
def random_title(self) -> Optional[str]:
"""Liefert einen zufaelligen Doku-Titel via Redirect."""
if not self._requests_available:
return None
try:
session = get_requests_session("dokustreams", headers=HEADERS)
resp = session.get(_absolute_url(RANDOM_PATH), headers=HEADERS,
timeout=DEFAULT_TIMEOUT, allow_redirects=True)
resp.raise_for_status()
final_url = (resp.url or "").strip()
if not final_url or final_url.rstrip("/").endswith(RANDOM_PATH.rstrip("/")):
return None
soup = _get_soup(final_url, session=session)
hits = _parse_listing_hits(soup)
if not hits:
# Einzelseite: Titel aus H1 oder og:title lesen
h1 = soup.select_one("h1.entry-title, h1")
title = (h1.get_text(" ", strip=True) if h1 else "").strip()
if title:
self._title_to_url[title] = final_url
return title
return None
hit = hits[0]
if hit.title:
self._title_to_url[hit.title] = hit.url
return hit.title
except Exception:
return None
return None
def resolve_stream_link(self, link: str) -> Optional[str]:
"""Folgt Redirects und versucht ResolveURL fuer Hoster-Links."""
if not link:
return None
from plugin_helpers import resolve_via_resolveurl
resolved = resolve_via_resolveurl(link, fallback_to_link=False)
if resolved:
return resolved
if self._requests_available:
try:
session = get_requests_session("dokustreams", headers=HEADERS)
resp = session.get(link, headers=HEADERS, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
resp.raise_for_status()
return (resp.url or link).strip() or link
except Exception:
pass
return link
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
if not title:

View File

@@ -31,6 +31,7 @@ except ImportError: # pragma: no cover - allow running outside Kodi
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, log_error, log_url, notify_url
from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text
ADDON_ID = "plugin.video.viewit"
SETTING_BASE_URL = "einschalten_base_url"
@@ -97,18 +98,11 @@ class MovieDetail:
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
return _shared_normalize_search_text(value)
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
return _shared_matches_query(query, title=title)
def _filter_movies_by_title(query: str, movies: List[MovieItem]) -> List[MovieItem]:
@@ -603,15 +597,6 @@ class EinschaltenPlugin(BasisPlugin):
url = urljoin(base + "/", path.lstrip("/"))
return f"{url}?{urlencode({'query': query})}"
def _api_movies_url(self, *, with_genres: int, page: int = 1) -> str:
base = self._get_base_url()
if not base:
return ""
params: Dict[str, str] = {"withGenres": str(int(with_genres))}
if page and int(page) > 1:
params["page"] = str(int(page))
return urljoin(base + "/", "api/movies") + f"?{urlencode(params)}"
def _genre_page_url(self, *, genre_id: int, page: int = 1) -> str:
"""Genre title pages are rendered server-side and embed the movie list in ng-state.
@@ -771,23 +756,6 @@ class EinschaltenPlugin(BasisPlugin):
except Exception:
return []
def _fetch_new_titles_movies(self) -> List[MovieItem]:
# "Neue Filme" lives at `/movies/new` and embeds the list in ng-state (`u: "/api/movies"`).
url = self._new_titles_url()
if not url:
return []
try:
_, body = self._http_get_text(url, timeout=20)
payload = _extract_ng_state_payload(body)
movies = _parse_ng_state_movies(payload)
_log_debug_line(f"parse_ng_state_movies:count={len(movies)}")
if movies:
_log_titles(movies, context="new_titles")
return movies
return []
except Exception:
return []
def _fetch_new_titles_movies_page(self, page: int) -> List[MovieItem]:
page = max(1, int(page or 1))
url = self._new_titles_url()
@@ -1047,16 +1015,32 @@ class EinschaltenPlugin(BasisPlugin):
return stream_url or None
def resolve_stream_link(self, link: str) -> Optional[str]:
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
return resolve_with_resolveurl(link) or link
return link
from plugin_helpers import resolve_via_resolveurl
return resolve_via_resolveurl(link, fallback_to_link=True)
def capabilities(self) -> Set[str]:
return {"new_titles", "genres"}
return {"new_titles", "genres", "popular_series"}
def popular_series(self) -> List[str]:
"""Liefert die am besten bewerteten Filme (nach voteAverage sortiert)."""
if not REQUESTS_AVAILABLE:
return []
if not self._get_base_url():
return []
movies = self._load_movies()
with_rating = [m for m in movies if m.vote_average is not None]
without_rating = [m for m in movies if m.vote_average is None]
ranked = sorted(with_rating, key=lambda m: (m.vote_average or 0.0), reverse=True)
ordered = ranked + without_rating
titles: List[str] = []
seen: set[str] = set()
for movie in ordered[:50]:
if movie.title in seen:
continue
seen.add(movie.title)
self._id_by_title[movie.title] = movie.id
titles.append(movie.title)
return titles
def new_titles(self) -> List[str]:
if not REQUESTS_AVAILABLE:

View File

@@ -27,6 +27,7 @@ else:
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, get_setting_string, log_error, log_url, notify_url
from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text
from http_session_pool import get_requests_session
if TYPE_CHECKING: # pragma: no cover
@@ -106,18 +107,11 @@ def _absolute_url(url: str) -> str:
def _normalize_search_text(value: str) -> str:
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
return _shared_normalize_search_text(value)
def _matches_query(query: str, *, title: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
return _shared_matches_query(query, title=title)
def _is_probably_content_url(url: str) -> bool:
@@ -531,7 +525,7 @@ class FilmpalastPlugin(BasisPlugin):
return max_page
def capabilities(self) -> set[str]:
return {"genres", "alpha", "series_catalog"}
return {"genres", "alpha", "series_catalog", "popular_series", "new_titles"}
def _parse_alpha_links(self, soup: BeautifulSoupT) -> Dict[str, str]:
alpha: Dict[str, str] = {}
@@ -732,9 +726,9 @@ class FilmpalastPlugin(BasisPlugin):
merged_poster = (poster or old_poster or "").strip()
self._title_meta[title] = (merged_plot, merged_poster)
def _extract_detail_metadata(self, soup: BeautifulSoupT) -> tuple[str, str]:
def _extract_detail_metadata(self, soup: BeautifulSoupT) -> tuple[str, str, str]:
if not soup:
return "", ""
return "", "", ""
root = soup.select_one("div#content[role='main']") or soup
detail = root.select_one("article.detail") or root
plot = ""
@@ -779,7 +773,22 @@ class FilmpalastPlugin(BasisPlugin):
if "/themes/" not in lower and "spacer.gif" not in lower and "/files/movies/" in lower:
poster = candidate
return plot, poster
# IMDb-Rating: Schema.org aggregateRating
rating = ""
rating_node = detail.select_one("[itemprop='ratingValue']")
if rating_node is not None:
rating = (rating_node.get_text(" ", strip=True) or "").strip()
if not rating:
# Fallback: data-attribute oder Klassen-basierte Anzeige
for sel in ("span.imdb", "span.rating", "[class*='imdb']"):
node = detail.select_one(sel)
if node is not None:
candidate = (node.get_text(" ", strip=True) or "").strip()
if candidate:
rating = candidate
break
return plot, poster, rating
def remember_series_url(self, title: str, series_url: str) -> None:
title = (title or "").strip()
@@ -836,12 +845,17 @@ class FilmpalastPlugin(BasisPlugin):
try:
soup = _get_soup(detail_url, session=get_requests_session("filmpalast", headers=HEADERS))
plot, poster = self._extract_detail_metadata(soup)
plot, poster, rating = self._extract_detail_metadata(soup)
except Exception:
plot, poster = "", ""
plot, poster, rating = "", "", ""
if plot:
info["plot"] = plot
if rating:
try:
info["rating"] = str(float(rating.replace(",", ".")))
except (ValueError, TypeError):
pass
if poster:
art = {"thumb": poster, "poster": poster}
self._store_title_meta(title, plot=info.get("plot", ""), poster=poster)
@@ -1031,6 +1045,35 @@ class FilmpalastPlugin(BasisPlugin):
def reset_preferred_hosters(self) -> None:
self._preferred_hosters = list(self._default_preferred_hosters)
def popular_series(self) -> List[str]:
"""Liefert beliebte Titel von /movies/top."""
if not self._requests_available:
return []
try:
url = _absolute_url("/movies/top")
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
except Exception:
return []
def new_titles_page(self, page: int = 1) -> List[str]:
"""Liefert neu hinzugefuegte Titel von /movies/new."""
if not self._requests_available:
return []
page = max(1, int(page or 1))
try:
base = _absolute_url("/movies/new")
url = base if page == 1 else urljoin(base.rstrip("/") + "/", f"page/{page}")
soup = _get_soup(url, session=get_requests_session("filmpalast", headers=HEADERS))
hits = self._parse_listing_hits(soup)
return self._apply_hits_to_title_index(hits)
except Exception:
return []
def new_titles(self) -> List[str]:
return self.new_titles_page(1)
def resolve_stream_link(self, link: str) -> Optional[str]:
if not link:
return None

View File

@@ -0,0 +1,463 @@
"""HDFilme Plugin für ViewIT.
HTML-Scraping von hdfilme-tv.cc (ehemals hdfilme.garden).
Filme und Serien, Hoster-Auflösung via ResolveURL.
"""
from __future__ import annotations
import re
from typing import Any, Callable, List, Optional
from urllib.parse import quote_plus
try: # pragma: no cover
import requests
from bs4 import BeautifulSoup
except ImportError as exc: # pragma: no cover
requests = None
BeautifulSoup = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
BASE_URL = "https://hdfilme-tv.cc"
DEFAULT_TIMEOUT = 20
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
_URL_SEARCH = BASE_URL + "/?do=search&subaction=search&story={query}"
_URL_NEW = BASE_URL + "/kinofilme-online/"
_URL_SERIES = BASE_URL + "/serienstream-deutsch/"
# Genre-Slug → URL-Pfad
GENRE_SLUGS: dict[str, str] = {
"Abenteuer": "abenteuer",
"Action": "action",
"Animation": "animation",
"Biographie": "biographie",
"Dokumentation": "dokumentation",
"Drama": "drama",
"Erotik": "erotikfilme",
"Familie": "familie",
"Fantasy": "fantasy",
"Historienfilm": "historien",
"Horror": "horror",
"Komödie": "komodie",
"Krieg": "krieg",
"Krimi": "krimi",
"Musikfilm": "musikfilme",
"Mystery": "mystery",
"Romantik": "romantik",
"Sci-Fi": "sci-fi",
"Sport": "sport",
"Thriller": "thriller",
"Western": "western",
}
# Hoster die übersprungen werden (kein Stream / nur Trailer)
_SKIP_LINK_KEYWORDS = ("youtube.com", "youtu.be", "hdfilme-tv.cc")
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def _absolute_url(url: str) -> str:
"""Macht eine relative oder protokoll-relative URL absolut."""
url = (url or "").strip()
if url.startswith("//"):
return "https:" + url
if url.startswith("/"):
return BASE_URL + url
return url
def _clean_title(raw: str) -> str:
"""Bereinigt einen Rohtitel von Seiten-Suffixen."""
title = (raw or "").strip()
for suffix in (" stream", " Stream", " kostenlos", " Deutsch", " German", " online"):
if title.endswith(suffix):
title = title[: -len(suffix)].strip()
return title
def _get_soup(url: str) -> Any:
"""HTTP-GET und BeautifulSoup-Parsing. Gibt None bei Fehler."""
if requests is None or BeautifulSoup is None:
return None
try:
response = requests.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return BeautifulSoup(response.text, "html.parser")
except Exception:
return None
# ---------------------------------------------------------------------------
# Plugin-Klasse
# ---------------------------------------------------------------------------
class HdfilmePlugin(BasisPlugin):
"""HDFilme Integration für ViewIT. HTML-Scraping via BeautifulSoup."""
name = "HDFilme"
def __init__(self) -> None:
self._title_to_url: dict[str, str] = {}
self._is_series: dict[str, bool] = {}
self._title_meta: dict[str, tuple[str, str]] = {} # title → (plot, poster)
self._episode_cache: dict[str, list[str]] = {} # detail_url → episode labels
self._preferred_hosters: list[str] = []
# ------------------------------------------------------------------
# Verfügbarkeit
# ------------------------------------------------------------------
@property
def is_available(self) -> bool:
return REQUESTS_AVAILABLE
@property
def unavailable_reason(self) -> str:
if REQUESTS_AVAILABLE:
return ""
return f"requests/bs4 nicht verfügbar: {REQUESTS_IMPORT_ERROR}"
# ------------------------------------------------------------------
# Internes Parsing
# ------------------------------------------------------------------
def _parse_entries(self, soup: Any) -> list[str]:
"""Parst eine Listing-Seite und gibt Titel zurück (cached)."""
if soup is None:
return []
titles: list[str] = []
seen: set[str] = set()
for box in soup.select("div.box-product"):
# URL aus erstem Link
link = box.find("a", href=True)
if not link:
continue
url = _absolute_url(link["href"])
if not url.endswith(".html"):
continue
# Titel aus h3
h3_a = box.select_one("h3 a")
if not h3_a:
continue
raw_title = h3_a.get_text(strip=True)
title = _clean_title(raw_title)
if not title or title in seen:
continue
seen.add(title)
# Thumbnail
img = box.select_one("img.lazyload")
poster = ""
if img and img.get("data-src"):
poster = _absolute_url(img["data-src"])
# Serien-Erkennung via Titel
is_series = bool(re.search(r"\bStaffel\b|\bSeason\b", raw_title, re.I))
self._title_to_url[title] = url
self._is_series[title] = is_series
if poster:
self._title_meta[title] = ("", poster)
titles.append(title)
return titles
def _ensure_detail_url(self, title: str) -> str:
"""Gibt die Detail-URL für einen Titel zurück.
Sucht zuerst im Cache, dann live über die Suchfunktion.
"""
url = self._title_to_url.get(title, "")
if url:
return url
# Fallback: Live-Suche (nötig wenn Plugin-Instanz neu, Cache leer)
search_url = _URL_SEARCH.format(query=quote_plus(title.strip()))
soup = _get_soup(search_url)
if soup:
self._parse_entries(soup)
url = self._title_to_url.get(title, "")
return url
def _get_detail_soup(self, title: str) -> Any:
"""Lädt die Detailseite eines Titels."""
url = self._ensure_detail_url(title)
if not url:
return None
return _get_soup(url)
def _extract_hoster_links(self, soup: Any, episode_id: str = "") -> dict[str, str]:
"""Extrahiert Hoster-Links aus einer Detailseite.
Gibt dict {Hoster-Name → URL} zurück.
episode_id: wenn gesetzt, nur Links aus dem `<li id="{episode_id}">` Block.
"""
if soup is None:
return {}
hosters: dict[str, str] = {}
if episode_id:
container = soup.select_one(f"li#{episode_id}")
if container is None:
return {}
candidates = container.select("a[data-link]")
else:
candidates = soup.select(".mirrors [data-link]")
seen_names: set[str] = set()
for el in candidates:
href = _absolute_url((el.get("data-link") or "").strip())
if not href:
continue
if any(kw in href for kw in _SKIP_LINK_KEYWORDS):
continue
name = el.get_text(strip=True) or "Hoster"
# Eindeutiger Name bei Duplikaten
base_name = name
i = 2
while name in seen_names:
name = f"{base_name} {i}"
i += 1
seen_names.add(name)
hosters[name] = href
return hosters
def _staffel_nr(self, season: str) -> int:
"""Extrahiert die Staffelnummer aus einem Label wie 'Staffel 2'."""
m = re.search(r"\d+", season or "")
return int(m.group()) if m else 1
def _ep_index(self, episode: str) -> int:
"""Extrahiert den Episode-Index aus einem Label wie 'Episode 3'."""
m = re.search(r"\d+", episode or "")
return int(m.group()) if m else 1
# ------------------------------------------------------------------
# Pflicht-Methoden
# ------------------------------------------------------------------
async def search_titles(
self,
query: str,
progress_callback: ProgressCallback = None,
) -> List[str]:
if not query or not REQUESTS_AVAILABLE:
return []
url = _URL_SEARCH.format(query=quote_plus(query.strip()))
soup = _get_soup(url)
return self._parse_entries(soup)
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
if self._is_series.get(title) is False:
return ["Film"]
if self._is_series.get(title) is True:
m = re.search(r"Staffel\s*(\d+)|Season\s*(\d+)", title, re.I)
nr = int(m.group(1) or m.group(2)) if m else 1
return [f"Staffel {nr}"]
# Unbekannt: Detailseite laden und prüfen
soup = self._get_detail_soup(title)
if soup and soup.select_one("div.series"):
self._is_series[title] = True
m = re.search(r"Staffel\s*(\d+)|Season\s*(\d+)", title, re.I)
nr = int(m.group(1) or m.group(2)) if m else 1
return [f"Staffel {nr}"]
self._is_series[title] = False
return ["Film"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
season = (season or "").strip()
if not title:
return []
if season == "Film":
return [title]
detail_url = self._ensure_detail_url(title)
cached = self._episode_cache.get(detail_url)
if cached is not None:
return cached
staffel_nr = self._staffel_nr(season)
soup = self._get_detail_soup(title)
if soup is None:
return [title]
# li IDs: "serie-{staffel}_{episode}"
pattern = f"serie-{staffel_nr}_"
episode_items = [li for li in soup.select("li[id]") if li.get("id", "").startswith(pattern)]
labels: list[str] = []
for li in episode_items:
ep_id = li.get("id", "") # z.B. "serie-1_3"
ep_num_str = ep_id.split("_")[-1]
# Episodentitel aus erstem <a href="#">
a = li.find("a", href="#")
if a:
raw = a.get_text(strip=True)
# "Episoden 3" → "Episode 3"
ep_label = re.sub(r"^Episoden?\s*", "", raw, flags=re.I).strip()
label = f"Episode {ep_label}" if ep_label else f"Episode {ep_num_str}"
else:
label = f"Episode {ep_num_str}"
labels.append(label)
result = labels if labels else [title]
if detail_url:
self._episode_cache[detail_url] = result
return result
def _hosters_for(self, title: str, season: str, episode: str) -> dict[str, str]:
"""Gibt alle verfügbaren Hoster {Name → URL} für Titel/Staffel/Episode zurück."""
soup = self._get_detail_soup(title)
if soup is None:
return {}
if season == "Film" or not self._is_series.get(title, False):
return self._extract_hoster_links(soup)
staffel_nr = self._staffel_nr(season)
ep_idx = self._ep_index(episode)
episode_id = f"serie-{staffel_nr}_{ep_idx}"
return self._extract_hoster_links(soup, episode_id)
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
return list(self._hosters_for(title, season, episode).keys())
def set_preferred_hosters(self, hosters: List[str]) -> None:
self._preferred_hosters = [h for h in hosters if h]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
season = (season or "").strip()
if not title:
return None
hosters = self._hosters_for(title, season, episode)
if not hosters:
return None
# Bevorzugten Hoster nutzen falls gesetzt
for preferred in self._preferred_hosters:
key = preferred.casefold()
for name, url in hosters.items():
if key in name.casefold() or key in url.casefold():
return url
# Fallback: erster Hoster
return next(iter(hosters.values()))
def resolve_stream_link(self, link: str) -> Optional[str]:
link = (link or "").strip()
if not link:
return None
try:
from plugin_helpers import resolve_via_resolveurl
return resolve_via_resolveurl(link, fallback_to_link=False)
except Exception:
return None
# ------------------------------------------------------------------
# Metadaten
# ------------------------------------------------------------------
def metadata_for(
self, title: str
) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
info: dict[str, str] = {"title": title}
art: dict[str, str] = {}
# Cache-Hit
cached = self._title_meta.get(title)
if cached:
plot, poster = cached
if plot:
info["plot"] = plot
if poster:
art["thumb"] = art["poster"] = poster
if info or art:
return info, art, None
# Detailseite laden
soup = self._get_detail_soup(title)
if soup is None:
return info, art, None
og_desc = soup.find("meta", attrs={"property": "og:description"})
if og_desc and og_desc.get("content"):
info["plot"] = og_desc["content"].strip()
og_img = soup.find("meta", attrs={"property": "og:image"})
poster = ""
if og_img and og_img.get("content"):
poster = _absolute_url(og_img["content"].strip())
art["thumb"] = art["poster"] = poster
# Jahr aus Textabschnitt "Titel YYYY"
year_el = soup.select_one("p.text-capitalize")
if year_el:
m = re.search(r"\b(19|20)\d{2}\b", year_el.get_text())
if m:
info["year"] = m.group()
self._title_meta[title] = (info.get("plot", ""), poster)
return info, art, None
# ------------------------------------------------------------------
# Browsing
# ------------------------------------------------------------------
def new_titles(self) -> List[str]:
if not REQUESTS_AVAILABLE:
return []
return self._parse_entries(_get_soup(_URL_NEW))
def new_titles_page(self, page: int = 1) -> List[str]:
if not REQUESTS_AVAILABLE:
return []
page = max(1, int(page or 1))
url = _URL_NEW if page == 1 else f"{_URL_NEW}page/{page}/"
return self._parse_entries(_get_soup(url))
def popular_series(self) -> List[str]:
if not REQUESTS_AVAILABLE:
return []
return self._parse_entries(_get_soup(_URL_SERIES))
def genres(self) -> List[str]:
return sorted(GENRE_SLUGS.keys())
def titles_for_genre(self, genre: str) -> List[str]:
return self.titles_for_genre_page(genre, 1)
def titles_for_genre_page(self, genre: str, page: int = 1) -> List[str]:
slug = GENRE_SLUGS.get(genre, "")
if not slug or not REQUESTS_AVAILABLE:
return []
page = max(1, int(page or 1))
url = f"{BASE_URL}/{slug}/" if page == 1 else f"{BASE_URL}/{slug}/page/{page}/"
return self._parse_entries(_get_soup(url))
def capabilities(self) -> set[str]:
return {"new_titles", "popular_series", "genres"}

View File

@@ -0,0 +1,426 @@
"""KKiste Plugin für ViewIT.
Nutzt die JSON-REST-API von kkiste.eu.
Filme und Serien mit TMDB-Thumbnails kein HTML-Scraping.
Serien-Besonderheit: Auf KKiste ist jede Staffel ein eigener Eintrag
(z.B. "Breaking Bad - Staffel 1"). Die Suche liefert alle passenden
Staffel-Einträge direkt.
"""
from __future__ import annotations
import re
from typing import Any, Callable, List, Optional
from urllib.parse import quote_plus
try: # pragma: no cover
import requests
except ImportError as exc: # pragma: no cover
requests = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
DOMAIN = "kkiste.eu"
BASE_URL = "https://" + DOMAIN
DEFAULT_TIMEOUT = 20
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Referer": BASE_URL + "/",
"Origin": BASE_URL,
}
# Sprache: 2=Deutsch, 3=Englisch, all=alle
_LANG = "2"
_THUMB_BASE = "https://image.tmdb.org/t/p/w300"
_URL_BROWSE = BASE_URL + "/data/browse/?lang={lang}&type={type}&order_by={order}&page={page}"
_URL_SEARCH = BASE_URL + "/data/browse/?lang={lang}&order_by=new&page=1&limit=0"
_URL_GENRE = BASE_URL + "/data/browse/?lang={lang}&type=movies&order_by=Trending&genre={genre}&page=1"
_URL_WATCH = BASE_URL + "/data/watch/?_id={id}"
GENRE_SLUGS: dict[str, str] = {
"Action": "Action",
"Animation": "Animation",
"Biographie": "Biographie",
"Dokumentation": "Dokumentation",
"Drama": "Drama",
"Familie": "Familie",
"Fantasy": "Fantasy",
"Horror": "Horror",
"Komödie": "Komödie",
"Krimi": "Krimi",
"Mystery": "Mystery",
"Romantik": "Romantik",
"Science-Fiction": "Sci-Fi",
"Thriller": "Thriller",
"Western": "Western",
}
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
# ---------------------------------------------------------------------------
# Plugin-Klasse
# ---------------------------------------------------------------------------
class KKistePlugin(BasisPlugin):
"""KKiste Integration für ViewIT (kkiste.eu).
Jede Staffel einer Serie ist auf KKiste ein eigenständiger API-Eintrag.
"""
name = "KKiste"
def __init__(self) -> None:
# title → watch-URL (/data/watch/?_id=X)
self._title_to_watch_url: dict[str, str] = {}
# title → (plot, poster, fanart)
self._title_meta: dict[str, tuple[str, str, str]] = {}
# title → True wenn "Staffel"/"Season" im Titel
self._is_series: dict[str, bool] = {}
# title → Staffelnummer (aus "Staffel N" extrahiert)
self._season_nr: dict[str, int] = {}
# bevorzugte Hoster für Hoster-Dialog
self._preferred_hosters: list[str] = []
# ------------------------------------------------------------------
# Verfügbarkeit
# ------------------------------------------------------------------
@property
def is_available(self) -> bool:
return REQUESTS_AVAILABLE
@property
def unavailable_reason(self) -> str:
if REQUESTS_AVAILABLE:
return ""
return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}"
# ------------------------------------------------------------------
# HTTP
# ------------------------------------------------------------------
def _get_session(self): # type: ignore[return]
from http_session_pool import get_requests_session
return get_requests_session("kkiste", headers=HEADERS)
def _get_json(self, url: str) -> dict | list | None:
session = self._get_session()
response = None
try:
response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return response.json()
except Exception:
return None
finally:
if response is not None:
try:
response.close()
except Exception:
pass
# ------------------------------------------------------------------
# Interne Hilfsmethoden
# ------------------------------------------------------------------
def _cache_entry(self, movie: dict) -> str:
"""Cached einen API-Eintrag und gibt den Titel zurück ('' = überspringen)."""
title = str(movie.get("title") or "").strip()
if not title or "_id" not in movie:
return ""
movie_id = str(movie["_id"])
self._title_to_watch_url[title] = _URL_WATCH.format(id=movie_id)
# Serie erkennen
is_series = "Staffel" in title or "Season" in title
self._is_series[title] = is_series
if is_series:
m = re.search(r"(?:Staffel|Season)\s*(\d+)", title, re.IGNORECASE)
if m:
self._season_nr[title] = int(m.group(1))
# Metadaten
poster = ""
for key in ("poster_path_season", "poster_path"):
if movie.get(key):
poster = _THUMB_BASE + str(movie[key])
break
fanart = _THUMB_BASE + str(movie["backdrop_path"]) if movie.get("backdrop_path") else ""
plot = str(movie.get("storyline") or movie.get("overview") or "")
self._title_meta[title] = (plot, poster, fanart)
return title
def _ensure_watch_url(self, title: str) -> str:
"""Gibt die Watch-URL zurück lädt bei leerem Cache alle Titel nach."""
url = self._title_to_watch_url.get(title, "")
if url:
return url
# Fallback: alle Titel laden und exact-match suchen
search_url = _URL_SEARCH.format(lang=_LANG)
data = self._get_json(search_url)
if isinstance(data, dict):
q_lower = title.lower()
for movie in (data.get("movies") or []):
if isinstance(movie, dict):
raw = str(movie.get("title") or "").strip()
if raw.lower() == q_lower:
self._cache_entry(movie)
return self._title_to_watch_url.get(title, "")
return ""
def _browse(self, content_type: str, order: str = "Trending") -> List[str]:
url = _URL_BROWSE.format(lang=_LANG, type=content_type, order=order, page=1)
data = self._get_json(url)
if not isinstance(data, dict):
return []
return [
t for movie in (data.get("movies") or [])
if isinstance(movie, dict) and (t := self._cache_entry(movie))
]
def _hosters_for(self, title: str, season: str, episode: str) -> dict[str, str]:
"""Gibt {Hoster-Name → URL} für Titel/Staffel/Episode zurück."""
watch_url = self._ensure_watch_url(title)
if not watch_url:
return {}
data = self._get_json(watch_url)
if not isinstance(data, dict):
return {}
streams = data.get("streams") or []
hosters: dict[str, str] = {}
seen: set[str] = set()
# Film vs Serie: relevante Streams filtern
if season == "Film":
target_streams = [s for s in streams if isinstance(s, dict)]
else:
m = re.search(r"\d+", episode or "")
ep_nr = int(m.group()) if m else None
if ep_nr is None:
return {}
target_streams = [
s for s in streams
if isinstance(s, dict) and s.get("e") == ep_nr
]
for stream in target_streams:
src = str(stream.get("stream") or "").strip()
if not src:
continue
# Hoster-Name aus der Stream-URL extrahieren (nicht aus "source" das ist die Aggregator-Quelle)
try:
from urllib.parse import urlparse
host = urlparse(src).hostname or "Hoster"
# Domain-Prefix entfernen (www.)
if host.startswith("www."):
host = host[4:]
except Exception:
host = "Hoster"
name = host
base_name = name
i = 2
while name in seen:
name = f"{base_name} {i}"
i += 1
seen.add(name)
hosters[name] = src
return hosters
# ------------------------------------------------------------------
# Pflicht-Methoden
# ------------------------------------------------------------------
async def search_titles(
self, query: str, progress_callback: ProgressCallback = None
) -> List[str]:
query = (query or "").strip()
if not query or not REQUESTS_AVAILABLE:
return []
# KKiste: limit=0 lädt alle Titel, client-seitige Filterung
url = _URL_SEARCH.format(lang=_LANG)
data = self._get_json(url)
if not isinstance(data, dict):
return []
q_lower = query.lower()
titles: list[str] = []
for movie in (data.get("movies") or []):
if not isinstance(movie, dict) or "_id" not in movie:
continue
raw_title = str(movie.get("title") or "").strip()
if not raw_title or q_lower not in raw_title.lower():
continue
t = self._cache_entry(movie)
if t:
titles.append(t)
return titles
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
is_series = self._is_series.get(title)
if is_series is None:
# Cache leer (neue Instanz) nachfüllen
self._ensure_watch_url(title)
is_series = self._is_series.get(title)
if is_series:
season_nr = self._season_nr.get(title, 1)
return [f"Staffel {season_nr}"]
return ["Film"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
if season == "Film":
return [title]
# Serie: Episodenliste aus /data/watch/ laden
watch_url = self._ensure_watch_url(title)
if not watch_url:
return []
data = self._get_json(watch_url)
if not isinstance(data, dict):
return []
episode_nrs: set[int] = set()
for stream in (data.get("streams") or []):
if not isinstance(stream, dict):
continue
e = stream.get("e")
if e is not None:
try:
episode_nrs.add(int(e))
except (ValueError, TypeError):
pass
if not episode_nrs:
return [title]
return [f"Episode {nr}" for nr in sorted(episode_nrs)]
# ------------------------------------------------------------------
# Stream
# ------------------------------------------------------------------
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
return list(self._hosters_for(title, season, episode).keys())
def set_preferred_hosters(self, hosters: List[str]) -> None:
self._preferred_hosters = [h for h in hosters if h]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
hosters = self._hosters_for(title, season, episode)
if not hosters:
return None
# Bevorzugten Hoster nutzen falls gesetzt
for preferred in self._preferred_hosters:
key = preferred.casefold()
for name, url in hosters.items():
if key in name.casefold() or key in url.casefold():
return url
# Fallback: erster Hoster
return next(iter(hosters.values()))
def resolve_stream_link(self, link: str) -> Optional[str]:
link = (link or "").strip()
if not link:
return None
try:
from plugin_helpers import resolve_via_resolveurl
return resolve_via_resolveurl(link, fallback_to_link=False)
except Exception:
return None
# ------------------------------------------------------------------
# Metadaten
# ------------------------------------------------------------------
def metadata_for(
self, title: str
) -> tuple[dict[str, str], dict[str, str], list | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
info: dict[str, str] = {"title": title}
art: dict[str, str] = {}
cached = self._title_meta.get(title)
if cached:
plot, poster, fanart = cached
if plot:
info["plot"] = plot
if poster:
art["thumb"] = poster
art["poster"] = poster
if fanart:
art["fanart"] = fanart
art["landscape"] = fanart
return info, art, None
# ------------------------------------------------------------------
# Browsing
# ------------------------------------------------------------------
def new_titles(self) -> List[str]:
return self._browse("movies", "new")
def new_titles_page(self, page: int = 1) -> List[str]:
page = max(1, int(page or 1))
url = _URL_BROWSE.format(lang=_LANG, type="movies", order="new", page=page)
data = self._get_json(url)
if not isinstance(data, dict):
return []
return [
t for movie in (data.get("movies") or [])
if isinstance(movie, dict) and (t := self._cache_entry(movie))
]
def popular_series(self) -> List[str]:
return self._browse("tvseries", "views")
def genres(self) -> List[str]:
return sorted(GENRE_SLUGS.keys())
def titles_for_genre(self, genre: str) -> List[str]:
slug = GENRE_SLUGS.get(genre, "")
if not slug:
return []
url = _URL_GENRE.format(lang=_LANG, genre=quote_plus(slug))
data = self._get_json(url)
if not isinstance(data, dict):
return []
return [
t for movie in (data.get("movies") or [])
if isinstance(movie, dict) and (t := self._cache_entry(movie))
]
def capabilities(self) -> set[str]:
return {"popular_series", "new_titles", "genres"}

View File

@@ -0,0 +1,781 @@
"""Moflix-Stream Plugin für ViewIT.
Nutzt die JSON-REST-API von moflix-stream.xyz.
Kein HTML-Parsing nötig alle Daten kommen als JSON.
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING, Any, Callable, List, Optional
from urllib.parse import quote, quote_plus, urlparse
try: # pragma: no cover - optional dependency
import requests
except ImportError as exc: # pragma: no cover
requests = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
else: # pragma: no cover
RequestsSession = Any
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
ADDON_ID = "plugin.video.viewit"
BASE_URL = "https://moflix-stream.xyz"
DEFAULT_TIMEOUT = 20
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Referer": BASE_URL + "/",
}
# Separate Header-Definition für VidHide-Requests (moflix-stream.click)
# Separater Browser-UA verhindert UA-basierte Blockierung durch VidHide
_VIDHIDE_HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Referer": BASE_URL + "/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
}
# Hoster-Domains, die erfahrungsgemäß 403 liefern oder kein ResolveURL-Support haben
_VIDEO_SKIP_DOMAINS: frozenset[str] = frozenset({
"gupload.xyz",
"veev.to",
})
# Hoster-Domains, die direkt über eine eigene API auflösbar sind (bevorzugen)
_VIDEO_PREFER_DOMAINS: frozenset[str] = frozenset({
"vidara.to",
})
_URL_SEARCH = BASE_URL + "/api/v1/search/{q1}?query={q2}&limit=8"
_URL_CHANNEL = BASE_URL + "/api/v1/channel/{slug}?channelType=channel&restriction=&paginate=simple"
_URL_TITLE = (
BASE_URL + "/api/v1/titles/{id}"
"?load=images,genres,productionCountries,keywords,videos,primaryVideo,seasons,compactCredits"
)
_URL_EPISODES = BASE_URL + "/api/v1/titles/{id}/seasons/{s}/episodes?perPage=100&query=&page=1"
_URL_EPISODE = (
BASE_URL + "/api/v1/titles/{id}/seasons/{s}/episodes/{e}"
"?load=videos,compactCredits,primaryVideo"
)
# Genre-Slugs (hardcodiert, da keine Genre-API vorhanden)
GENRE_SLUGS: dict[str, str] = {
"Action": "action",
"Animation": "animation",
"Dokumentation": "dokumentation",
"Drama": "drama",
"Familie": "top-kids-liste",
"Fantasy": "fantasy",
"Horror": "horror",
"Komödie": "comedy",
"Krimi": "crime",
"Liebesfilm": "romance",
"Science-Fiction": "science-fiction",
"Thriller": "thriller",
}
# Collections (Slugs aus dem offiziellen xStream-Plugin)
COLLECTION_SLUGS: dict[str, str] = {
"American Pie Complete Collection": "the-american-pie-collection",
"Bud Spencer & Terence Hill": "bud-spencer-terence-hill-collection",
"DC Superhelden Collection": "the-dc-universum-collection",
"Mission: Impossible Collection": "the-mission-impossible-collection",
"Fast & Furious Collection": "fast-furious-movie-collection",
"Halloween Collection": "halloween-movie-collection",
"Herr der Ringe Collection": "der-herr-der-ringe-collection",
"James Bond Collection": "the-james-bond-collection",
"Jason Bourne Collection": "the-jason-bourne-collection",
"Jurassic Park Collection": "the-jurassic-park-collection",
"Kinder & Familienfilme": "top-kids-liste",
"Marvel Cinematic Universe": "the-marvel-cinematic-universe-collection",
"Olsenbande Collection": "the-olsenbande-collection",
"Planet der Affen Collection": "the-planet-der-affen-collection",
"Rocky Collection": "rocky-the-knockout-collection",
"Star Trek Kinofilm Collection": "the-star-trek-movies-collection",
"Star Wars Collection": "the-star-wars-collection",
"Stirb Langsam Collection": "stirb-langsam-collection",
"X-Men Collection": "x-men-collection",
}
# ---------------------------------------------------------------------------
# Hilfsfunktionen (Modul-Ebene)
# ---------------------------------------------------------------------------
def _extract_first_number(label: str) -> int | None:
"""Extrahiert erste Ganzzahl aus einem Label. 'Staffel 2' → 2."""
m = re.search(r"\d+", label or "")
return int(m.group()) if m else None
def _normalize_video_name(name: str, src: str) -> str:
"""Normalisiert den Hoster-Namen eines Video-Objekts.
'Mirror-HDCloud' → Domain aus src; 'VidCloud-720''VidCloud'
"""
name = (name or "").strip()
if name.lower().startswith("mirror"):
parsed = urlparse(src or "")
host = parsed.netloc or ""
return host.split(".")[0].capitalize() if host else name
return name.split("-")[0].strip() or name
def _safe_str(value: object) -> str:
"""Konvertiert einen Wert sicher zu String, None → ''."""
if value is None:
return ""
return str(value).strip()
def _unpack_packer(packed_js: str) -> str:
"""Entpackt Dean Edwards p.a.c.k.e.r. JavaScript.
Format:
eval(function(p,a,c,k,e,d){...}('code',base,count,'k1|k2|...'.split('|'),0,0))
Findet die gepackte Zeichenkette, die Basis und den Schlüssel-String,
konvertiert jeden Token (base-N → Index) und ersetzt ihn durch das
jeweilige Schlüsselwort.
"""
m = re.search(
r"'((?:[^'\\]|\\.){20,})'\s*,\s*(\d+)\s*,\s*\d+\s*,\s*"
r"'((?:[^'\\]|\\.)*)'\s*\.split\s*\(\s*'\|'\s*\)",
packed_js,
)
if not m:
return packed_js
packed = m.group(1).replace("\\'", "'").replace("\\\\", "\\")
base = int(m.group(2))
keys = m.group(3).split("|")
_digits = "0123456789abcdefghijklmnopqrstuvwxyz"
def _unbase(s: str) -> int:
result = 0
for ch in s:
if ch not in _digits:
raise ValueError(f"Not a base-{base} digit: {ch!r}")
result = result * base + _digits.index(ch)
return result
def _replace(m2: re.Match) -> str: # type: ignore[type-arg]
token = m2.group(0)
try:
idx = _unbase(token)
replacement = keys[idx] if idx < len(keys) else ""
return replacement if replacement else token
except (ValueError, IndexError):
return token
return re.sub(r"\b\w+\b", _replace, packed)
# ---------------------------------------------------------------------------
# Plugin-Klasse
# ---------------------------------------------------------------------------
class MoflixPlugin(BasisPlugin):
"""Moflix-Stream Integration für ViewIT.
Verwendet die offizielle JSON-REST-API kein HTML-Scraping.
"""
name = "Moflix"
def __init__(self) -> None:
# title (str) → vollständige API-URL /api/v1/titles/{id}
self._title_to_url: dict[str, str] = {}
# title → (plot, poster_url, fanart_url)
self._title_meta: dict[str, tuple[str, str, str]] = {}
# title → True wenn Serie, False wenn Film
self._is_series: dict[str, bool] = {}
# (title, season_nr) → Moflix-API-ID (ändert sich pro Staffel!)
self._season_api_ids: dict[tuple[str, int], str] = {}
# (title, season_nr) → Liste der Episode-Labels
self._episode_labels: dict[tuple[str, int], list[str]] = {}
# bevorzugte Hoster für Hoster-Dialog
self._preferred_hosters: list[str] = []
# ------------------------------------------------------------------
# Verfügbarkeit
# ------------------------------------------------------------------
@property
def is_available(self) -> bool:
return REQUESTS_AVAILABLE
@property
def unavailable_reason(self) -> str:
if REQUESTS_AVAILABLE:
return ""
return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}"
# ------------------------------------------------------------------
# HTTP
# ------------------------------------------------------------------
def _get_session(self) -> RequestsSession:
from http_session_pool import get_requests_session
return get_requests_session("moflix", headers=HEADERS)
def _get_json(self, url: str, headers: dict | None = None) -> dict | list | None:
"""GET-Request, gibt geparste JSON-Antwort zurück oder None bei Fehler."""
session = self._get_session()
response = None
try:
response = session.get(url, headers=headers or HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return response.json()
except Exception:
return None
finally:
if response is not None:
try:
response.close()
except Exception:
pass
def _get_html(
self,
url: str,
headers: dict | None = None,
fresh_session: bool = False,
) -> str | None:
"""GET-Request, gibt den Response-Text (HTML) zurück oder None bei Fehler.
fresh_session=True: eigene requests.Session (keine gecachten Cookies/State).
"""
response = None
try:
if fresh_session:
import requests as _req
session = _req.Session()
else:
session = self._get_session()
req_headers = headers or {
**HEADERS,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
response = session.get(url, headers=req_headers, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return response.text
except Exception:
return None
finally:
if response is not None:
try:
response.close()
except Exception:
pass
# ------------------------------------------------------------------
# Interne Hilfsmethoden
# ------------------------------------------------------------------
def _cache_channel_entry(self, entry: dict) -> str:
"""Cached einen Kanal/Sucheintrag und gibt den Titel zurück (oder '' zum Überspringen)."""
title = _safe_str(entry.get("name"))
if not title:
return ""
api_id = _safe_str(entry.get("id"))
if not api_id:
return ""
self._title_to_url[title] = _URL_TITLE.format(id=api_id)
is_series = bool(entry.get("is_series", False))
self._is_series[title] = is_series
plot = _safe_str(entry.get("description"))
poster = _safe_str(entry.get("poster"))
fanart = _safe_str(entry.get("backdrop"))
self._title_meta[title] = (plot, poster, fanart)
return title
def _titles_from_channel(self, slug: str, page: int = 1) -> list[str]:
"""Lädt Titel eines Moflix-Channels (Kategorie/Genre/Collection)."""
url = _URL_CHANNEL.format(slug=slug)
if page > 1:
url = f"{url}&page={page}"
data = self._get_json(url)
if not isinstance(data, dict):
return []
entries = []
try:
entries = data["channel"]["content"]["data"]
except (KeyError, TypeError):
return []
titles: list[str] = []
for entry in (entries or []):
if not isinstance(entry, dict):
continue
t = self._cache_channel_entry(entry)
if t:
titles.append(t)
return titles
def _ensure_title_url(self, title: str) -> str:
"""Gibt die gecachte API-URL für einen Titel zurück, oder ''."""
return self._title_to_url.get(title, "")
def _resolve_title(self, title: str) -> None:
"""Cache-Miss-Fallback: Titel per Such-API nachschlagen und cachen.
Wird aufgerufen wenn der In-Memory-Cache leer ist (z.B. nach einem
neuen Kodi-Addon-Aufruf, der eine frische Plugin-Instanz erzeugt).
"""
q1 = quote(title)
q2 = quote_plus(title)
url = _URL_SEARCH.format(q1=q1, q2=q2)
data = self._get_json(url)
if not isinstance(data, dict):
return
for entry in (data.get("results") or []):
if not isinstance(entry, dict):
continue
if _safe_str(entry.get("name")) == title:
self._cache_channel_entry(entry)
return
# ------------------------------------------------------------------
# Pflicht-Methoden
# ------------------------------------------------------------------
async def search_titles(
self,
query: str,
progress_callback: ProgressCallback = None,
) -> List[str]:
query = (query or "").strip()
if not query or not REQUESTS_AVAILABLE:
return []
q1 = quote(query)
q2 = quote_plus(query)
url = _URL_SEARCH.format(q1=q1, q2=q2)
data = self._get_json(url)
if not isinstance(data, dict):
return []
results = data.get("results") or []
titles: list[str] = []
for entry in results:
if not isinstance(entry, dict):
continue
# Personen überspringen
if "person" in _safe_str(entry.get("model_type")):
continue
t = self._cache_channel_entry(entry)
if t:
titles.append(t)
return titles
def seasons_for(self, title: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
# Film: direkt zum Stream
if self._is_series.get(title) is False:
return ["Film"]
url = self._ensure_title_url(title)
if not url:
self._resolve_title(title)
url = self._ensure_title_url(title)
if not url:
return []
data = self._get_json(url)
if not isinstance(data, dict):
return []
seasons_raw = []
try:
seasons_raw = data["seasons"]["data"]
except (KeyError, TypeError):
pass
if not seasons_raw:
# Kein Staffel-Daten → Film-Fallback
return ["Film"]
# Nach Staffelnummer sortieren
seasons_raw = sorted(seasons_raw, key=lambda s: int(s.get("number", 0) or 0))
labels: list[str] = []
for season in seasons_raw:
if not isinstance(season, dict):
continue
nr = season.get("number")
api_id = _safe_str(season.get("title_id"))
if nr is None or not api_id:
continue
try:
season_nr = int(nr)
except (ValueError, TypeError):
continue
self._season_api_ids[(title, season_nr)] = api_id
labels.append(f"Staffel {season_nr}")
return labels
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
season = (season or "").strip()
if not title or not season:
return []
# Film: Episode = Titel selbst
if season == "Film":
return [title]
season_nr = _extract_first_number(season)
if season_nr is None:
return []
# Cache-Hit
cached = self._episode_labels.get((title, season_nr))
if cached is not None:
return cached
api_id = self._season_api_ids.get((title, season_nr), "")
if not api_id:
# Staffeln nachladen falls noch nicht gecacht
self.seasons_for(title)
api_id = self._season_api_ids.get((title, season_nr), "")
if not api_id:
return []
url = _URL_EPISODES.format(id=api_id, s=season_nr)
data = self._get_json(url)
if not isinstance(data, dict):
return []
episodes_raw = []
try:
episodes_raw = data["pagination"]["data"]
except (KeyError, TypeError):
pass
labels: list[str] = []
for ep in (episodes_raw or []):
if not isinstance(ep, dict):
continue
# Episoden ohne Video überspringen
if ep.get("primary_video") is None:
continue
ep_nr_raw = ep.get("episode_number")
ep_name = _safe_str(ep.get("name"))
try:
ep_nr = int(ep_nr_raw or 0)
except (ValueError, TypeError):
continue
if ep_nr <= 0:
continue
label = f"Episode {ep_nr}"
if ep_name:
label = f"{label} {ep_name}"
labels.append(label)
self._episode_labels[(title, season_nr)] = labels
return labels
# ------------------------------------------------------------------
# Stream
# ------------------------------------------------------------------
def _videos_for(self, title: str, season: str, episode: str) -> list[dict]:
"""Gibt die rohe videos[]-Liste für einen Titel/Staffel/Episode zurück."""
title = (title or "").strip()
season = (season or "").strip()
if season == "Film":
url = self._ensure_title_url(title)
if not url:
self._resolve_title(title)
url = self._ensure_title_url(title)
if not url:
return []
data = self._get_json(url)
if not isinstance(data, dict):
return []
return (data.get("title") or {}).get("videos") or []
season_nr = _extract_first_number(season)
episode_nr = _extract_first_number(episode)
if season_nr is None or episode_nr is None:
return []
api_id = self._season_api_ids.get((title, season_nr), "")
if not api_id:
self.seasons_for(title)
api_id = self._season_api_ids.get((title, season_nr), "")
if not api_id:
return []
url = _URL_EPISODE.format(id=api_id, s=season_nr, e=episode_nr)
data = self._get_json(url)
if not isinstance(data, dict):
return []
return (data.get("episode") or {}).get("videos") or []
def _hosters_from_videos(self, videos: list) -> dict[str, str]:
"""Konvertiert videos[] zu {Hoster-Name → src-URL}, mit Skip/Prefer-Logik."""
hosters: dict[str, str] = {}
seen: set[str] = set()
for v in videos:
if not isinstance(v, dict):
continue
src = _safe_str(v.get("src"))
if not src or "youtube" in src.lower():
continue
domain = urlparse(src).netloc.lstrip("www.")
if domain in _VIDEO_SKIP_DOMAINS:
continue
name = _normalize_video_name(_safe_str(v.get("name")), src)
if not name:
name = domain
base_name = name
i = 2
while name in seen:
name = f"{base_name} {i}"
i += 1
seen.add(name)
hosters[name] = src
return hosters
def available_hosters_for(self, title: str, season: str, episode: str) -> List[str]:
videos = self._videos_for(title, season, episode)
return list(self._hosters_from_videos(videos).keys())
def set_preferred_hosters(self, hosters: List[str]) -> None:
self._preferred_hosters = [h for h in hosters if h]
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
videos = self._videos_for(title, season, episode)
if not videos:
return None
hosters = self._hosters_from_videos(videos)
if not hosters:
return None
# Bevorzugten Hoster nutzen falls gesetzt
for preferred in self._preferred_hosters:
key = preferred.casefold()
for name, url in hosters.items():
if key in name.casefold() or key in url.casefold():
return url
# Fallback: Prefer-Domains zuerst, dann Rest
for url in hosters.values():
domain = urlparse(url).netloc.lstrip("www.")
if domain in _VIDEO_PREFER_DOMAINS:
return url
return next(iter(hosters.values()))
def _resolve_vidara(self, filecode: str) -> Optional[str]:
"""Löst einen vidara.to-Filecode über die vidara-API auf → HLS-URL."""
api_url = f"https://vidara.to/api/stream?filecode={filecode}"
vidara_headers = {
**HEADERS,
"Referer": f"https://vidara.to/e/{filecode}",
"Origin": "https://vidara.to",
}
data = self._get_json(api_url, headers=vidara_headers)
if not isinstance(data, dict):
return None
return _safe_str(data.get("streaming_url")) or None
def _resolve_vidhide(self, embed_url: str) -> Optional[str]:
"""Löst einen VidHide-Embed-Link (moflix-stream.click) auf → HLS-URL.
Verwendet eine frische Session mit echtem Chrome-UA um UA-basierte
Blockierungen zu umgehen. Entpackt p.a.c.k.e.r.-JS und extrahiert
den HLS-Stream aus links.hls4/hls3/hls2.
"""
# Frische Session (NICHT die gecachte "moflix"-Session) mit VidHide-Headers
html = self._get_html(embed_url, headers=_VIDHIDE_HEADERS, fresh_session=True)
if not html or "eval(function(p,a,c,k,e" not in html:
return None
unpacked = _unpack_packer(html)
# Priorität: hls4 > hls3 > hls2
for hls_key in ("hls4", "hls3", "hls2"):
m = re.search(rf'"{hls_key}"\s*:\s*"(https://[^"]+)"', unpacked)
if m:
url = m.group(1)
if url:
# Kodi braucht Referer + UA als Header-Suffix damit der CDN die HLS-URL akzeptiert
from urllib.parse import urlencode
headers = urlencode({
"Referer": embed_url,
"User-Agent": _VIDHIDE_HEADERS["User-Agent"],
})
return f"{url}|{headers}"
return None
def resolve_stream_link(self, link: str) -> Optional[str]:
link = (link or "").strip()
if not link:
return None
# vidara.to: direkt über eigene API auflösen
vidara_m = re.search(r'vidara\.to/e/([A-Za-z0-9_-]+)', link)
if vidara_m:
resolved = self._resolve_vidara(vidara_m.group(1))
if resolved:
return resolved
# VidHide (moflix-stream.click): zuerst ResolveURL probieren (FileLions-Modul
# nutzt Kodis libcurl mit anderem TLS-Fingerprint), dann eigenen Resolver
if "moflix-stream.click" in link:
try:
from plugin_helpers import resolve_via_resolveurl
resolved = resolve_via_resolveurl(link, fallback_to_link=False)
if resolved:
return resolved
except Exception:
pass
# Fallback: eigener p.a.c.k.e.r. Resolver
resolved = self._resolve_vidhide(link)
if resolved:
return resolved
return None
# Fallback: ResolveURL (ohne Link-Fallback lieber None als unauflösbaren Link)
try:
from plugin_helpers import resolve_via_resolveurl
return resolve_via_resolveurl(link, fallback_to_link=False)
except Exception:
return None
# ------------------------------------------------------------------
# Metadaten
# ------------------------------------------------------------------
def metadata_for(
self, title: str
) -> tuple[dict[str, str], dict[str, str], list[object] | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
info: dict[str, str] = {"title": title}
art: dict[str, str] = {}
# Cache-Hit
cached = self._title_meta.get(title)
if cached:
plot, poster, fanart = cached
if plot:
info["plot"] = plot
if poster:
art["thumb"] = poster
art["poster"] = poster
if fanart:
art["fanart"] = fanart
art["landscape"] = fanart
if "plot" in info or art:
return info, art, None
# API-Abruf
url = self._ensure_title_url(title)
if not url:
return info, art, None
data = self._get_json(url)
if not isinstance(data, dict):
return info, art, None
title_obj = data.get("title") or {}
plot = _safe_str(title_obj.get("description"))
poster = _safe_str(title_obj.get("poster"))
fanart = _safe_str(title_obj.get("backdrop"))
rating_raw = title_obj.get("rating")
year_raw = _safe_str(title_obj.get("release_date"))
if plot:
info["plot"] = plot
if rating_raw is not None:
try:
info["rating"] = str(float(rating_raw))
except (ValueError, TypeError):
pass
if year_raw and len(year_raw) >= 4:
info["year"] = year_raw[:4]
if poster:
art["thumb"] = poster
art["poster"] = poster
if fanart:
art["fanart"] = fanart
art["landscape"] = fanart
# Cachen
self._title_meta[title] = (plot, poster, fanart)
return info, art, None
# ------------------------------------------------------------------
# Browsing-Features
# ------------------------------------------------------------------
def popular_series(self) -> List[str]:
return self._titles_from_channel("series")
def new_titles(self) -> List[str]:
return self._titles_from_channel("now-playing")
def new_titles_page(self, page: int = 1) -> List[str]:
return self._titles_from_channel("now-playing", page=page)
def genres(self) -> List[str]:
return sorted(GENRE_SLUGS.keys())
def titles_for_genre(self, genre: str) -> List[str]:
return self.titles_for_genre_page(genre, 1)
def titles_for_genre_page(self, genre: str, page: int = 1) -> List[str]:
slug = GENRE_SLUGS.get(genre, "")
if not slug:
return []
return self._titles_from_channel(slug, page=page)
def collections(self) -> List[str]:
return sorted(COLLECTION_SLUGS.keys())
def titles_for_collection(self, collection: str, page: int = 1) -> List[str]:
slug = COLLECTION_SLUGS.get(collection, "")
if not slug:
return []
return self._titles_from_channel(slug, page=page)
def capabilities(self) -> set[str]:
return {"popular_series", "new_titles", "collections", "genres"}

View File

@@ -0,0 +1,254 @@
"""NetzkKino Plugin für ViewIT.
Nutzt die öffentliche JSON-API von Netzkino.
Nur Filme, keine Serien. Direkte MP4-Streams kein ResolveURL nötig.
Legal und kostenlos.
"""
from __future__ import annotations
from typing import Any, Callable, List, Optional
try: # pragma: no cover
import requests
except ImportError as exc: # pragma: no cover
requests = None
REQUESTS_AVAILABLE = False
REQUESTS_IMPORT_ERROR = exc
else:
REQUESTS_AVAILABLE = True
REQUESTS_IMPORT_ERROR = None
from plugin_interface import BasisPlugin
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
DEFAULT_TIMEOUT = 20
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
}
_API_BASE = "https://api.netzkino.de.simplecache.net/capi-2.0a"
_STREAM_BASE = "https://pmd.netzkino-seite.netzkino.de"
_URL_SEARCH = _API_BASE + "/search?q={query}&d=www&l=de-DE"
_URL_CATEGORY = _API_BASE + "/categories/{slug}.json?d=www&l=de-DE"
# Slug → Anzeigename
CATEGORIES: dict[str, str] = {
"highlights": "Highlights",
"neue-filme": "Neue Filme",
"alle-filme": "Alle Filme",
"action": "Action",
"animation": "Animation",
"dokumentarfilm": "Dokumentation",
"drama": "Drama",
"fantasy": "Fantasy",
"horror": "Horror",
"komodie": "Komödie",
"krimi-thriller": "Krimi & Thriller",
"romantik": "Romantik",
"sci-fi": "Science-Fiction",
}
ProgressCallback = Optional[Callable[[str, Optional[int]], Any]]
# ---------------------------------------------------------------------------
# Plugin-Klasse
# ---------------------------------------------------------------------------
class NetzkinoPlugin(BasisPlugin):
"""NetzkKino Integration für ViewIT.
Alle Titel sind Filme (keine Serien). Streams sind direkte MP4-URLs.
"""
name = "NetzkKino"
def __init__(self) -> None:
# title → direkte MP4-URL
self._title_to_stream: dict[str, str] = {}
# title → (plot, poster, fanart)
self._title_meta: dict[str, tuple[str, str, str]] = {}
# ------------------------------------------------------------------
# Verfügbarkeit
# ------------------------------------------------------------------
@property
def is_available(self) -> bool:
return REQUESTS_AVAILABLE
@property
def unavailable_reason(self) -> str:
if REQUESTS_AVAILABLE:
return ""
return f"requests nicht verfügbar: {REQUESTS_IMPORT_ERROR}"
# ------------------------------------------------------------------
# HTTP
# ------------------------------------------------------------------
def _get_session(self): # type: ignore[return]
from http_session_pool import get_requests_session
return get_requests_session("netzkino", headers=HEADERS)
def _get_json(self, url: str) -> dict | list | None:
session = self._get_session()
response = None
try:
response = session.get(url, headers=HEADERS, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return response.json()
except Exception:
return None
finally:
if response is not None:
try:
response.close()
except Exception:
pass
# ------------------------------------------------------------------
# Interne Hilfsmethoden
# ------------------------------------------------------------------
def _build_stream_url(self, streaming_id: str) -> str:
return f"{_STREAM_BASE}/{streaming_id}.mp4"
def _cache_post(self, post: dict) -> str:
"""Cached einen API-Post und gibt den Titel zurück ('' = überspringen)."""
title = str(post.get("title") or "").strip()
if not title:
return ""
# Stream-URL aus custom_fields.Streaming[0]
custom = post.get("custom_fields") or {}
streaming_ids = custom.get("Streaming") or []
if not streaming_ids or not streaming_ids[0]:
return ""
stream_url = self._build_stream_url(str(streaming_ids[0]))
self._title_to_stream[title] = stream_url
# Metadaten
plot = str(post.get("content") or "").strip()
# Poster: thumbnail
poster = str(post.get("thumbnail") or "").strip()
# Fanart: featured_img_all[0]
fanart_list = custom.get("featured_img_all") or []
fanart = str(fanart_list[0]).strip() if fanart_list and fanart_list[0] else ""
self._title_meta[title] = (plot, poster, fanart)
return title
def _load_posts(self, url: str) -> List[str]:
data = self._get_json(url)
if not isinstance(data, dict):
return []
titles: list[str] = []
for post in (data.get("posts") or []):
if not isinstance(post, dict):
continue
t = self._cache_post(post)
if t:
titles.append(t)
return titles
# ------------------------------------------------------------------
# Pflicht-Methoden
# ------------------------------------------------------------------
async def search_titles(
self, query: str, progress_callback: ProgressCallback = None
) -> List[str]:
query = (query or "").strip()
if not query or not REQUESTS_AVAILABLE:
return []
from urllib.parse import quote_plus
url = _URL_SEARCH.format(query=quote_plus(query))
return self._load_posts(url)
def seasons_for(self, title: str) -> List[str]:
# NetzkKino hat ausschließlich Filme
return ["Film"]
def episodes_for(self, title: str, season: str) -> List[str]:
title = (title or "").strip()
if not title:
return []
# Nur eine Episode: der Film selbst
return [title]
# ------------------------------------------------------------------
# Stream
# ------------------------------------------------------------------
def stream_link_for(self, title: str, season: str, episode: str) -> Optional[str]:
title = (title or "").strip()
return self._title_to_stream.get(title)
def resolve_stream_link(self, link: str) -> Optional[str]:
# Direkte MP4-URL keine Auflösung nötig
link = (link or "").strip()
return link if link else None
# ------------------------------------------------------------------
# Metadaten
# ------------------------------------------------------------------
def metadata_for(
self, title: str
) -> tuple[dict[str, str], dict[str, str], list | None]:
title = (title or "").strip()
if not title:
return {}, {}, None
info: dict[str, str] = {"title": title}
art: dict[str, str] = {}
cached = self._title_meta.get(title)
if cached:
plot, poster, fanart = cached
if plot:
info["plot"] = plot
if poster:
art["thumb"] = poster
art["poster"] = poster
if fanart:
art["fanart"] = fanart
art["landscape"] = fanart
return info, art, None
# ------------------------------------------------------------------
# Browsing
# ------------------------------------------------------------------
def new_titles_page(self, page: int = 1) -> List[str]:
url = _URL_CATEGORY.format(slug="neue-filme")
return self._load_posts(url)
def new_titles(self) -> List[str]:
return self.new_titles_page(1)
def genres(self) -> List[str]:
# Gibt die Anzeigenamen zurück (sortiert, Browsing-Kategorien)
return sorted(CATEGORIES.values())
def titles_for_genre(self, genre: str) -> List[str]:
# Slug aus Anzeigename rückauflösen
slug = next((s for s, n in CATEGORIES.items() if n == genre), "")
if not slug:
return []
url = _URL_CATEGORY.format(slug=slug)
return self._load_posts(url)
def capabilities(self) -> set[str]:
return {"new_titles", "genres"}

File diff suppressed because it is too large Load Diff

View File

@@ -20,7 +20,7 @@ import os
import re
import json
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
from urllib.parse import urljoin
from urllib.parse import urlencode, urljoin
try: # pragma: no cover - optional dependency
import requests
@@ -46,6 +46,7 @@ except ImportError: # pragma: no cover - allow running outside Kodi
from plugin_interface import BasisPlugin
from plugin_helpers import dump_response_html, get_setting_bool, log_error, log_url, notify_url
from regex_patterns import DIGITS
from search_utils import matches_query as _shared_matches_query, normalize_search_text as _shared_normalize_search_text
if TYPE_CHECKING: # pragma: no cover
from requests import Session as RequestsSession
@@ -66,12 +67,9 @@ SETTING_LOG_URLS = "log_urls_topstreamfilm"
SETTING_DUMP_HTML = "dump_html_topstreamfilm"
SETTING_SHOW_URL_INFO = "show_url_info_topstreamfilm"
SETTING_LOG_ERRORS = "log_errors_topstreamfilm"
SETTING_GENRE_MAX_PAGES = "topstream_genre_max_pages"
DEFAULT_TIMEOUT = 20
DEFAULT_PREFERRED_HOSTERS = ["supervideo", "dropload", "voe"]
MEINECLOUD_HOST = "meinecloud.click"
DEFAULT_GENRE_MAX_PAGES = 20
HARD_MAX_GENRE_PAGES = 200
HEADERS = {
"User-Agent": "Mozilla/5.0 (Kodi; ViewIt) AppleWebKit/537.36 (KHTML, like Gecko)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
@@ -101,24 +99,12 @@ class SearchHit:
def _normalize_search_text(value: str) -> str:
"""Normalisiert Text für robuste, wortbasierte Suche/Filter.
Wir ersetzen Nicht-Alphanumerisches durch Leerzeichen und kollabieren Whitespace.
Dadurch kann z.B. "Star Trek: Lower Decks Der Film" sauber auf Tokens gematcht werden.
"""
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
return _shared_normalize_search_text(value)
def _matches_query(query: str, *, title: str, description: str) -> bool:
normalized_query = _normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {_normalize_search_text(title)} "
return f" {normalized_query} " in haystack
_ = description
return _shared_matches_query(query, title=title)
def _strip_der_film_suffix(title: str) -> str:
@@ -232,8 +218,10 @@ class TopstreamfilmPlugin(BasisPlugin):
if directory and not xbmcvfs.exists(directory):
xbmcvfs.mkdirs(directory)
handle = xbmcvfs.File(path, "w")
handle.write(payload)
handle.close()
try:
handle.write(payload)
finally:
handle.close()
else:
with open(path, "w", encoding="utf-8") as handle:
handle.write(payload)
@@ -297,8 +285,10 @@ class TopstreamfilmPlugin(BasisPlugin):
if directory and not xbmcvfs.exists(directory):
xbmcvfs.mkdirs(directory)
handle = xbmcvfs.File(path, "w")
handle.write(payload)
handle.close()
try:
handle.write(payload)
finally:
handle.close()
else:
with open(path, "w", encoding="utf-8") as handle:
handle.write(payload)
@@ -347,22 +337,6 @@ class TopstreamfilmPlugin(BasisPlugin):
return urljoin(base if base.endswith("/") else base + "/", href)
return href
def _get_setting_bool(self, setting_id: str, *, default: bool = False) -> bool:
return get_setting_bool(ADDON_ID, setting_id, default=default)
def _get_setting_int(self, setting_id: str, *, default: int) -> int:
if xbmcaddon is None:
return default
try:
addon = xbmcaddon.Addon(ADDON_ID)
getter = getattr(addon, "getSettingInt", None)
if callable(getter):
return int(getter(setting_id))
raw = str(addon.getSetting(setting_id) or "").strip()
return int(raw) if raw else default
except Exception:
return default
def _notify_url(self, url: str) -> None:
notify_url(
ADDON_ID,
@@ -401,9 +375,6 @@ class TopstreamfilmPlugin(BasisPlugin):
message=message,
)
def capabilities(self) -> set[str]:
return {"genres", "popular_series"}
def _popular_url(self) -> str:
return self._absolute_url("/beliebte-filme-online.html")
@@ -1192,14 +1163,83 @@ class TopstreamfilmPlugin(BasisPlugin):
return hosters.get(first_name)
def resolve_stream_link(self, link: str) -> Optional[str]:
from plugin_helpers import resolve_via_resolveurl
return resolve_via_resolveurl(link, fallback_to_link=True)
def capabilities(self) -> set[str]:
return {"genres", "popular_series", "year_filter", "new_titles"}
def years_available(self) -> List[str]:
"""Liefert verfügbare Erscheinungsjahre (aktuelles Jahr bis 1980)."""
import datetime
current_year = datetime.date.today().year
return [str(y) for y in range(current_year, 1979, -1)]
def titles_for_year(self, year: str, page: int = 1) -> List[str]:
"""Liefert Titel für ein bestimmtes Erscheinungsjahr.
URL-Muster: /xfsearch/{year}/ oder /xfsearch/{year}/page/{n}/
"""
year = (year or "").strip()
if not year or not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
page = max(1, int(page or 1))
base = self._get_base_url()
if page == 1:
url = f"{base}/xfsearch/{year}/"
else:
url = f"{base}/xfsearch/{year}/page/{page}/"
try:
from resolveurl_backend import resolve as resolve_with_resolveurl
soup = self._get_soup(url)
except Exception:
resolve_with_resolveurl = None
if callable(resolve_with_resolveurl):
resolved = resolve_with_resolveurl(link)
return resolved or link
return link
return []
hits = self._parse_listing_titles(soup)
titles: List[str] = []
seen: set[str] = set()
for hit in hits:
if hit.title in seen:
continue
seen.add(hit.title)
self._title_to_url[hit.title] = hit.url
self._store_title_meta(hit.title, plot=hit.description, poster=hit.poster)
titles.append(hit.title)
if titles:
self._save_title_url_cache()
return titles
def new_titles_page(self, page: int = 1) -> List[str]:
"""Liefert neu hinzugefügte Filme.
URL-Muster: /neueste-filme/ oder /neueste-filme/page/{n}/
"""
if not REQUESTS_AVAILABLE or BeautifulSoup is None:
return []
page = max(1, int(page or 1))
base = self._get_base_url()
if page == 1:
url = f"{base}/neueste-filme/"
else:
url = f"{base}/neueste-filme/page/{page}/"
try:
soup = self._get_soup(url)
except Exception:
return []
hits = self._parse_listing_titles(soup)
titles: List[str] = []
seen: set[str] = set()
for hit in hits:
if hit.title in seen:
continue
seen.add(hit.title)
self._title_to_url[hit.title] = hit.url
self._store_title_meta(hit.title, plot=hit.description, poster=hit.poster)
titles.append(hit.title)
if titles:
self._save_title_url_cache()
return titles
def new_titles(self) -> List[str]:
return self.new_titles_page(1)
# Alias für die automatische Plugin-Erkennung.

View File

@@ -3,6 +3,7 @@
Keep common patterns in one place to avoid accidental double-escaping (e.g. \"\\\\d\").
"""
from __future__ import annotations
SEASON_EPISODE_TAG = r"S\s*(\d+)\s*E\s*(\d+)"
SEASON_EPISODE_URL = r"/staffel-(\d+)/episode-(\d+)"

View File

@@ -6,15 +6,71 @@ zu einer abspielbaren Media-URL (inkl. evtl. Header-Suffix) aufgelöst werden.
from __future__ import annotations
import importlib
import os
import sys
from typing import Optional
_LAST_RESOLVE_ERROR = ""
def _debug_log(message: str) -> None:
line = f"[ViewIt][ResolveURL] {message}"
try:
import xbmc # type: ignore
xbmc.log(line, xbmc.LOGDEBUG)
except Exception:
return
def _append_addon_lib_path(addon_id: str) -> bool:
try:
import xbmcaddon # type: ignore
import xbmcvfs # type: ignore
addon = xbmcaddon.Addon(addon_id)
addon_path = addon.getAddonInfo("path")
lib_path = xbmcvfs.translatePath(os.path.join(addon_path, "lib"))
if lib_path and lib_path not in sys.path:
sys.path.append(lib_path)
return bool(lib_path)
except Exception:
return False
def get_last_error() -> str:
return str(_LAST_RESOLVE_ERROR or "")
def _import_resolveurl():
try:
return importlib.import_module("resolveurl")
except Exception as exc:
_debug_log(f"import resolveurl failed (direct): {exc}")
# Kodi should load transitive deps, but some runtimes miss sys.path entries.
_append_addon_lib_path("script.module.resolveurl")
_append_addon_lib_path("script.module.kodi-six")
_append_addon_lib_path("script.module.six")
try:
return importlib.import_module("resolveurl")
except Exception as exc:
_debug_log(f"import resolveurl failed (with addon lib paths): {exc}")
return None
def resolve(url: str) -> Optional[str]:
global _LAST_RESOLVE_ERROR
_LAST_RESOLVE_ERROR = ""
if not url:
_debug_log("resolve() skipped (empty url)")
return None
try:
import resolveurl # type: ignore
except Exception:
_debug_log(f"input: {url}")
resolveurl = _import_resolveurl()
if resolveurl is None:
_LAST_RESOLVE_ERROR = "resolveurl missing"
_debug_log("result: <none> (resolveurl missing)")
return None
try:
@@ -23,21 +79,36 @@ def resolve(url: str) -> Optional[str]:
hmf = hosted(url)
valid = getattr(hmf, "valid_url", None)
if callable(valid) and not valid():
_LAST_RESOLVE_ERROR = "invalid url"
_debug_log("result: <none> (invalid url for HostedMediaFile)")
return None
resolver = getattr(hmf, "resolve", None)
if callable(resolver):
result = resolver()
return str(result) if result else None
except Exception:
pass
if result:
_debug_log(f"result: {result}")
return str(result)
_LAST_RESOLVE_ERROR = "unresolved"
_debug_log("result: <none> (HostedMediaFile unresolved)")
return None
except Exception as exc:
_LAST_RESOLVE_ERROR = str(exc or "")
_debug_log(f"HostedMediaFile error: {_LAST_RESOLVE_ERROR}")
try:
resolve_fn = getattr(resolveurl, "resolve", None)
if callable(resolve_fn):
result = resolve_fn(url)
return str(result) if result else None
except Exception:
if result:
_debug_log(f"result: {result}")
return str(result)
_LAST_RESOLVE_ERROR = "unresolved"
_debug_log("result: <none> (resolve() unresolved)")
return None
except Exception as exc:
_LAST_RESOLVE_ERROR = str(exc or "")
_debug_log(f"resolve() error: {_LAST_RESOLVE_ERROR}")
return None
_debug_log("result: <none> (no resolver path)")
return None

View File

@@ -1,6 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<settings>
<category label="Debug und Logs">
<category label="Quellen">
<setting id="serienstream_base_url" type="text" label="SerienStream Basis-URL" default="https://s.to" />
<setting id="serienstream_catalog_search" type="bool" label="SerienStream: Katalog-Suche (mehr Ergebnisse, langsamer)" default="true" />
<setting id="aniworld_base_url" type="text" label="AniWorld Basis-URL" default="https://aniworld.to" />
<setting id="topstream_base_url" type="text" label="TopStream Basis-URL" default="https://topstreamfilm.live" />
<setting id="einschalten_base_url" type="text" label="Einschalten Basis-URL" default="https://einschalten.in" />
<setting id="filmpalast_base_url" type="text" label="Filmpalast Basis-URL" default="https://filmpalast.to" />
<setting id="doku_streams_base_url" type="text" label="Doku-Streams Basis-URL" default="https://doku-streams.com" />
</category>
<category label="Metadaten">
<setting id="serienstream_metadata_source" type="enum" label="SerienStream Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="aniworld_metadata_source" type="enum" label="AniWorld Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="topstreamfilm_metadata_source" type="enum" label="TopStream Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="einschalten_metadata_source" type="enum" label="Einschalten Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="filmpalast_metadata_source" type="enum" label="Filmpalast Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="doku_streams_metadata_source" type="enum" label="Doku-Streams Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="kkiste_metadata_source" type="enum" label="KKiste Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="moflix_metadata_source" type="enum" label="Moflix Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />
<setting id="tmdb_language" type="text" label="TMDB Sprache (z. B. de-DE)" default="de-DE" />
<setting id="tmdb_show_plot" type="bool" label="TMDB Beschreibung anzeigen" default="true" />
<setting id="tmdb_show_art" type="bool" label="TMDB Poster und Vorschaubild anzeigen" default="true" />
<setting id="tmdb_show_fanart" type="bool" label="TMDB Fanart/Backdrop anzeigen" default="true" />
<setting id="tmdb_show_rating" type="bool" label="TMDB Bewertung anzeigen" default="true" />
<setting id="tmdb_show_votes" type="bool" label="TMDB Stimmen anzeigen" default="false" />
</category>
<category label="TMDB Erweitert">
<setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" />
<setting id="tmdb_prefetch_concurrency" type="number" label="TMDB: gleichzeitige Anfragen (1-20)" default="6" />
<setting id="tmdb_show_cast" type="bool" label="TMDB Besetzung anzeigen" default="false" />
<setting id="tmdb_show_episode_cast" type="bool" label="TMDB Besetzung pro Episode anzeigen" default="false" />
<setting id="tmdb_genre_metadata" type="bool" label="TMDB Daten in Genre-Listen anzeigen" default="false" />
<setting id="tmdb_log_requests" type="bool" label="TMDB API-Anfragen loggen" default="false" />
<setting id="tmdb_log_responses" type="bool" label="TMDB API-Antworten loggen" default="false" />
</category>
<category label="Anzeige">
<setting id="filmpalast_max_page_items" type="number" label="Filmpalast: Max. Eintraege pro Seite" default="15" />
<setting id="topstreamfilm_max_page_items" type="number" label="TopStream: Max. Eintraege pro Seite" default="15" />
<setting id="aniworld_max_page_items" type="number" label="AniWorld: Max. Eintraege pro Seite" default="15" />
<setting id="netzkkino_max_page_items" type="number" label="Netzkino: Max. Eintraege pro Seite" default="15" />
<setting id="kkiste_max_page_items" type="number" label="KKiste: Max. Eintraege pro Seite" default="15" />
<setting id="hdfilme_max_page_items" type="number" label="HDFilme: Max. Eintraege pro Seite" default="15" />
<setting id="moflix_max_page_items" type="number" label="Moflix: Max. Eintraege pro Seite" default="15" />
<setting id="einschalten_max_page_items" type="number" label="Einschalten: Max. Eintraege pro Seite" default="15" />
</category>
<category label="Wiedergabe">
<setting id="autoplay_enabled" type="bool" label="Autoplay (bevorzugten Hoster automatisch waehlen)" default="false" />
<setting id="preferred_hoster" type="text" label="Bevorzugter Hoster" default="voe" />
</category>
<category label="Updates">
<setting id="update_channel" type="enum" label="Update-Kanal" default="1" values="Main|Nightly|Custom|Dev" />
<setting id="apply_update_channel" type="action" label="Update-Kanal jetzt anwenden" action="RunPlugin(plugin://plugin.video.viewit/?action=apply_update_channel)" option="close" />
<setting id="auto_update_enabled" type="bool" label="Automatische Updates (beim Start pruefen)" default="false" />
<setting id="auto_update_interval" type="enum" label="Update-Pruefintervall" default="1" values="1 Stunde|6 Stunden|24 Stunden" />
<setting id="select_update_version" type="action" label="Version waehlen und installieren" action="RunPlugin(plugin://plugin.video.viewit/?action=select_update_version)" option="close" />
<setting id="install_resolveurl" type="action" label="ResolveURL installieren/reparieren" action="RunPlugin(plugin://plugin.video.viewit/?action=install_resolveurl)" option="close" />
<setting id="resolveurl_auto_install" type="bool" label="ResolveURL automatisch installieren (beim Start pruefen)" default="true" />
<setting id="update_installed_version" type="text" label="Installierte Version" default="-" enable="false" />
<setting id="update_available_selected" type="text" label="Verfuegbar (gewaehlter Kanal)" default="-" enable="false" />
<setting id="resolveurl_status" type="text" label="ResolveURL Status" default="-" enable="false" />
<setting id="update_active_channel" type="text" label="Aktiver Kanal" default="-" enable="false" />
<setting id="update_active_repo_url" type="text" label="Aktive Repo URL" default="-" enable="false" />
<setting id="update_repo_url_main" type="text" label="Main URL (addons.xml)" default="https://gitea.it-drui.de/viewit/ViewIT-Kodi-Repo/raw/branch/main/addons.xml" />
<setting id="update_repo_url_nightly" type="text" label="Nightly URL (addons.xml)" default="https://gitea.it-drui.de/viewit/ViewIT-Kodi-Repo/raw/branch/nightly/addons.xml" />
<setting id="update_repo_url_dev" type="text" label="Dev URL (addons.xml)" default="https://gitea.it-drui.de/viewit/ViewIT-Kodi-Repo/raw/branch/dev/addons.xml" />
<setting id="update_repo_url" type="text" label="Custom URL (addons.xml)" default="https://gitea.it-drui.de/viewit/ViewIT-Kodi-Repo/raw/branch/nightly/addons.xml" />
<setting id="auto_update_last_ts" type="text" label="Auto-Update letzte Pruefung (intern)" default="0" visible="false" />
<setting id="resolveurl_last_ts" type="text" label="ResolveURL letzte Pruefung (intern)" default="0" visible="false" />
</category>
<category label="Trakt">
<setting id="trakt_enabled" type="bool" label="Trakt aktivieren" default="false" />
<setting id="trakt_client_id" type="text" label="Trakt Client ID" default="" />
<setting id="trakt_client_secret" type="text" label="Trakt Client Secret" default="" />
<setting id="trakt_auth" type="action" label="Trakt autorisieren" action="RunPlugin(plugin://plugin.video.viewit/?action=trakt_auth)" option="close" />
<setting id="trakt_scrobble" type="bool" label="Scrobbling aktivieren" default="true" />
<setting id="trakt_access_token" type="text" label="" default="" visible="false" />
<setting id="trakt_refresh_token" type="text" label="" default="" visible="false" />
<setting id="trakt_token_expires" type="text" label="" default="0" visible="false" />
</category>
<category label="Debug Global">
<setting id="debug_log_urls" type="bool" label="URLs mitschreiben (global)" default="false" />
<setting id="debug_dump_html" type="bool" label="HTML speichern (global)" default="false" />
<setting id="debug_show_url_info" type="bool" label="Aktuelle URL anzeigen (global)" default="false" />
@@ -8,83 +94,32 @@
<setting id="log_max_mb" type="number" label="URL-Log: maximale Dateigroesse (MB)" default="5" />
<setting id="log_max_files" type="number" label="URL-Log: Anzahl alter Dateien" default="3" />
<setting id="dump_max_files" type="number" label="HTML: maximale Dateien pro Plugin" default="200" />
<setting id="log_urls_serienstream" type="bool" label="Serienstream: URLs mitschreiben" default="false" />
<setting id="dump_html_serienstream" type="bool" label="Serienstream: HTML speichern" default="false" />
<setting id="show_url_info_serienstream" type="bool" label="Serienstream: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_serienstream" type="bool" label="Serienstream: Fehler mitschreiben" default="false" />
<setting id="log_urls_aniworld" type="bool" label="Aniworld: URLs mitschreiben" default="false" />
<setting id="dump_html_aniworld" type="bool" label="Aniworld: HTML speichern" default="false" />
<setting id="show_url_info_aniworld" type="bool" label="Aniworld: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_aniworld" type="bool" label="Aniworld: Fehler mitschreiben" default="false" />
<setting id="log_urls_topstreamfilm" type="bool" label="Topstreamfilm: URLs mitschreiben" default="false" />
<setting id="dump_html_topstreamfilm" type="bool" label="Topstreamfilm: HTML speichern" default="false" />
<setting id="show_url_info_topstreamfilm" type="bool" label="Topstreamfilm: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_topstreamfilm" type="bool" label="Topstreamfilm: Fehler mitschreiben" default="false" />
</category>
<category label="Debug Quellen">
<setting id="log_urls_serienstream" type="bool" label="SerienStream: URLs mitschreiben" default="false" />
<setting id="dump_html_serienstream" type="bool" label="SerienStream: HTML speichern" default="false" />
<setting id="show_url_info_serienstream" type="bool" label="SerienStream: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_serienstream" type="bool" label="SerienStream: Fehler mitschreiben" default="false" />
<setting id="log_urls_aniworld" type="bool" label="AniWorld: URLs mitschreiben" default="false" />
<setting id="dump_html_aniworld" type="bool" label="AniWorld: HTML speichern" default="false" />
<setting id="show_url_info_aniworld" type="bool" label="AniWorld: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_aniworld" type="bool" label="AniWorld: Fehler mitschreiben" default="false" />
<setting id="log_urls_topstreamfilm" type="bool" label="TopStream: URLs mitschreiben" default="false" />
<setting id="dump_html_topstreamfilm" type="bool" label="TopStream: HTML speichern" default="false" />
<setting id="show_url_info_topstreamfilm" type="bool" label="TopStream: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_topstreamfilm" type="bool" label="TopStream: Fehler mitschreiben" default="false" />
<setting id="log_urls_einschalten" type="bool" label="Einschalten: URLs mitschreiben" default="false" />
<setting id="dump_html_einschalten" type="bool" label="Einschalten: HTML speichern" default="false" />
<setting id="show_url_info_einschalten" type="bool" label="Einschalten: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_einschalten" type="bool" label="Einschalten: Fehler mitschreiben" default="false" />
<setting id="log_urls_filmpalast" type="bool" label="Filmpalast: URLs mitschreiben" default="false" />
<setting id="dump_html_filmpalast" type="bool" label="Filmpalast: HTML speichern" default="false" />
<setting id="show_url_info_filmpalast" type="bool" label="Filmpalast: Aktuelle URL anzeigen" default="false" />
<setting id="log_errors_filmpalast" type="bool" label="Filmpalast: Fehler mitschreiben" default="false" />
</category>
<category label="TopStream">
<setting id="topstream_base_url" type="text" label="Basis-URL" default="https://topstreamfilm.live" />
<setting id="topstreamfilm_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
<setting id="topstream_genre_max_pages" type="number" label="Genres: max. Seiten laden" default="20" />
</category>
<category label="SerienStream">
<setting id="serienstream_base_url" type="text" label="Basis-URL" default="https://s.to" />
<setting id="serienstream_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
</category>
<category label="AniWorld">
<setting id="aniworld_base_url" type="text" label="Basis-URL" default="https://aniworld.to" />
<setting id="aniworld_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
</category>
<category label="Einschalten">
<setting id="einschalten_base_url" type="text" label="Basis-URL" default="https://einschalten.in" />
<setting id="einschalten_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
</category>
<category label="Filmpalast">
<setting id="filmpalast_base_url" type="text" label="Basis-URL" default="https://filmpalast.to" />
<setting id="filmpalast_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
</category>
<category label="Doku-Streams">
<setting id="doku_streams_base_url" type="text" label="Basis-URL" default="https://doku-streams.com" />
<setting id="doku_streams_metadata_source" type="enum" label="Metadatenquelle" default="0" values="Automatisch|Quelle|TMDB|Mischen" />
</category>
<category label="TMDB">
<setting id="tmdb_enabled" type="bool" label="TMDB aktivieren" default="true" />
<setting id="tmdb_api_key" type="text" label="TMDB API Key" default="" />
<setting id="tmdb_language" type="text" label="TMDB Sprache (z. B. de-DE)" default="de-DE" />
<setting id="tmdb_prefetch_concurrency" type="number" label="TMDB: gleichzeitige Anfragen (1-20)" default="6" />
<setting id="tmdb_show_plot" type="bool" label="TMDB Beschreibung anzeigen" default="true" />
<setting id="tmdb_show_art" type="bool" label="TMDB Poster und Vorschaubild anzeigen" default="true" />
<setting id="tmdb_show_fanart" type="bool" label="TMDB Fanart/Backdrop anzeigen" default="true" />
<setting id="tmdb_show_rating" type="bool" label="TMDB Bewertung anzeigen" default="true" />
<setting id="tmdb_show_votes" type="bool" label="TMDB Stimmen anzeigen" default="false" />
<setting id="tmdb_show_cast" type="bool" label="TMDB Besetzung anzeigen" default="false" />
<setting id="tmdb_show_episode_cast" type="bool" label="TMDB Besetzung pro Episode anzeigen" default="false" />
<setting id="tmdb_genre_metadata" type="bool" label="TMDB Daten in Genre-Listen anzeigen" default="false" />
<setting id="tmdb_log_requests" type="bool" label="TMDB API-Anfragen loggen" default="false" />
<setting id="tmdb_log_responses" type="bool" label="TMDB API-Antworten loggen" default="false" />
</category>
<category label="Update">
<setting id="update_channel" type="enum" label="Update-Kanal" default="0" values="Main|Nightly|Custom" />
<setting id="auto_update_enabled" type="bool" label="Automatische Updates (beim Start pruefen)" default="false" />
<setting id="update_repo_url_main" type="text" label="Main URL (addons.xml)" default="https://gitea.it-drui.de/viewit/ViewIT-Kodi-Repo/raw/branch/main/addons.xml" />
<setting id="update_repo_url_nightly" type="text" label="Nightly URL (addons.xml)" default="https://gitea.it-drui.de/viewit/ViewIT-Kodi-Repo/raw/branch/nightly/addons.xml" />
<setting id="update_repo_url" type="text" label="Custom URL (addons.xml)" default="https://gitea.it-drui.de/viewit/ViewIT-Kodi-Repo/raw/branch/main/addons.xml" />
<setting id="auto_update_last_ts" type="text" label="Auto-Update letzte Pruefung (intern)" default="0" visible="false" />
<setting id="run_update_check" type="action" label="Jetzt nach Updates suchen" action="RunPlugin(plugin://plugin.video.viewit/?action=check_updates)" option="close" />
<setting id="update_info" type="text" label="Updates laufen ueber den normalen Kodi-Update-Mechanismus." default="" enable="false" />
<setting id="update_version_addon" type="text" label="ViewIT Version" default="-" enable="false" />
<setting id="update_version_serienstream" type="text" label="Serienstream Version" default="-" enable="false" />
<setting id="update_version_aniworld" type="text" label="Aniworld Version" default="-" enable="false" />
<setting id="update_version_einschalten" type="text" label="Einschalten Version" default="-" enable="false" />
<setting id="update_version_topstreamfilm" type="text" label="Topstreamfilm Version" default="-" enable="false" />
<setting id="update_version_filmpalast" type="text" label="Filmpalast Version" default="-" enable="false" />
<setting id="update_version_doku_streams" type="text" label="Doku-Streams Version" default="-" enable="false" />
</category>
</settings>

29
addon/search_utils.py Normal file
View File

@@ -0,0 +1,29 @@
from __future__ import annotations
import re
def normalize_search_text(value: str) -> str:
"""Normalisiert Text fuer wortbasierte Suche.
Gemeinsames Verhalten:
- lower-case
- Nicht-Alphanumerisches -> Leerzeichen
- mehrfachen Whitespace kollabieren
"""
value = (value or "").casefold()
value = re.sub(r"[^a-z0-9]+", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
def matches_query(query: str, *, title: str) -> bool:
"""True, wenn der normalisierte Titel den normalisierten Query als ganzes Token enthaelt."""
normalized_query = normalize_search_text(query)
if not normalized_query:
return False
haystack = f" {normalize_search_text(title)} "
return f" {normalized_query} " in haystack

View File

@@ -454,6 +454,7 @@ def lookup_movie(
@dataclass(frozen=True)
class TmdbEpisodeMeta:
title: str
plot: str
thumb: str
runtime_minutes: int
@@ -545,6 +546,7 @@ def lookup_tv_season(
continue
if not ep_number:
continue
title = (entry.get("name") or "").strip()
plot = (entry.get("overview") or "").strip()
runtime_minutes = 0
try:
@@ -553,7 +555,55 @@ def lookup_tv_season(
runtime_minutes = 0
still_path = (entry.get("still_path") or "").strip()
thumb = f"{TMDB_IMAGE_BASE}/w300{still_path}" if still_path else ""
if not plot and not thumb and not runtime_minutes:
if not title and not plot and not thumb and not runtime_minutes:
continue
result[ep_number] = TmdbEpisodeMeta(plot=plot, thumb=thumb, runtime_minutes=runtime_minutes)
result[ep_number] = TmdbEpisodeMeta(title=title, plot=plot, thumb=thumb, runtime_minutes=runtime_minutes)
return result or None
# ---------------------------------------------------------------------------
# External IDs (IMDb, TVDb) für Trakt-Integration
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class TmdbExternalIds:
imdb_id: str # z.B. "tt1234567"
tvdb_id: int # TheTVDB-ID
def fetch_external_ids(
*,
kind: str,
tmdb_id: int,
api_key: str,
timeout: int = 15,
log: Callable[[str], None] | None = None,
log_responses: bool = False,
) -> Optional[TmdbExternalIds]:
"""Ruft IMDb-ID und TVDb-ID via /movie/{id}/external_ids oder /tv/{id}/external_ids ab."""
if requests is None or not tmdb_id:
return None
api_key = (api_key or "").strip()
if not api_key:
return None
kind = (kind or "").strip()
if kind not in ("movie", "tv"):
return None
params = {"api_key": api_key}
url = f"{TMDB_API_BASE}/{kind}/{tmdb_id}/external_ids?{urlencode(params)}"
status, payload, body_text = _tmdb_get_json(
url=url, timeout=timeout, log=log, log_responses=log_responses,
)
if callable(log):
log(f"TMDB RESPONSE /{kind}/{{id}}/external_ids status={status}")
if status != 200 or not isinstance(payload, dict):
return None
imdb_id = (payload.get("imdb_id") or "").strip()
tvdb_id = 0
try:
tvdb_id = int(payload.get("tvdb_id") or 0)
except (ValueError, TypeError):
tvdb_id = 0
if not imdb_id and not tvdb_id:
return None
return TmdbExternalIds(imdb_id=imdb_id, tvdb_id=tvdb_id)

65
docs/ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,65 @@
## ViewIT Architekturüberblick
Dieses Dokument fasst die wichtigsten Kernmodule des Addons und ihre Aufgaben zusammen.
Es ergänzt die Detaildokumente `DEFAULT_ROUTER.md` und `PLUGIN_SYSTEM.md`.
### Ebenen und Verantwortlichkeiten
- **Router (`addon/default.py`)**
- Einstiegspunkt aus Kodi (Parsing von `sys.argv`).
- Lädt und verwaltet alle Plugins (Discovery, Instanziierung, Fehlerisolation).
- Baut die Kodi-Menüs (Titel-, Staffel-, Episodenlisten, Spezialmenüs).
- Mapped UIAktionen (`search`, `seasons`, `episodes`, `play_*`) auf Plugin-Methoden.
- Startet Playback und hält Kodi-Playstate konsistent (Resume/Watched).
- **Plugin-Vertrag (`addon/plugin_interface.py`)**
- Definiert `BasisPlugin` als zentrale abstrakte Basisklasse.
- Kern-API:
- `search_titles(query, progress_callback?)`
- `seasons_for(title)`
- `episodes_for(title, season)`
- Optionale Fähigkeiten:
- Stream-Auflösung (`stream_link_for`, `resolve_stream_link`)
- Metadaten (`metadata_for`, `genres`, `titles_for_genre`, `popular_series`, `capabilities`).
- Dient als Referenz für alle konkreten Provider in `addon/plugins/`.
- **Plugin-Hilfen (`addon/plugin_helpers.py`)**
- Zugriff auf Addon-Settings als String/Bool/Int (robust, auch außerhalb von Kodi).
- Optionale URLBenachrichtigungen im UI (`notify_url`).
- Strukturierte Logging-Helfer (`log_url`, `log_error`, `dump_response_html`) mit Rotationslogik.
- Normalisierung von speziellen Stream-URLs (`normalize_resolved_stream_url`).
- Fokus: Wiederverwendbare Infrastruktur für alle Plugins, ohne deren Kernlogik zu vermischen.
- **HTTP Session Pool (`addon/http_session_pool.py`)**
- Verwaltet wiederverwendete `requests.Session`Instanzen pro Schlüssel.
- Ziel: TCPVerbindungen und Cookies über mehrere Requests hinweg wiederverwenden.
- Bietet `get_requests_session(key, headers?)` und `close_all_sessions()`.
- Wird von Plugins oder Hilfsmodulen genutzt, die viele HTTPAufrufe pro Sitzung machen.
- **TMDB-Integration (`addon/tmdb.py`)**
- Kapselt alle Zugriffe auf die TMDBAPI (TVShows, Staffeln, Episoden, Filme).
- Enthält Datenklassen für CastMitglieder, Shows, Staffeln und Filme.
- Nutzt ein threadlokales `requests.Session`Pooling für parallele Metadatenabfragen.
- Wird vom Router genutzt, um Plugin-Metadaten optional mit TMDBDaten anzureichern.
- **Metadaten-Helfer (`addon/metadata_utils.py`)**
- Berechnet plugin-spezifische SettingIDs für Metadatenquellen.
- Entscheidet pro Plugin und UserSetting, ob Quelle, TMDB oder Mix bevorzugt wird.
- Sammelt Metadaten aus Plugins (`collect_plugin_metadata`) und merged sie mit TMDB (`merge_metadata`).
- Liefert Signale, wann ein TMDBFallback nötig ist (`needs_tmdb`).
- **ResolveURL Backend (`addon/resolveurl_backend.py`)**
- Optionales Backend, das `script.module.resolveurl` nutzt, wenn installiert.
- Versucht HosterLinks in abspielbare MediaURLs aufzulösen.
- Speichert den letzten Fehlerzustand (`get_last_error`) für Logging oder UserFeedback.
- Ist vollständig optional und bricht das Addon nicht, wenn ResolveURL fehlt.
- **Regex-Muster (`addon/regex_patterns.py`)**
- Zentrale Sammlung wiederverwendeter Regulärer Ausdrücke (Staffel/EpisodenTags, Ziffern etc.).
- Ziel: Konsistenz und Vermeidung von fehleranfälligem Copy/Paste in Plugins.
- **Plugins (`addon/plugins/*.py`)**
- Konkrete Integrationen zu einzelnen Providern (z.B. Serien-/Filmportale).
- Implementieren `BasisPlugin` und optional zusätzliche Capabilities.
- Verwenden die oben beschriebenen Hilfs und Infrastrukturmodule.

View File

@@ -10,20 +10,26 @@ Diese Datei zeigt, wie Plugins im Projekt aufgebaut sind und wie sie mit dem Rou
## Pflichtmethoden
Jedes Plugin implementiert:
- `async search_titles(query: str) -> list[str]`
- `async search_titles(query: str, progress_callback: Callable[[str, Optional[int]], Any] | None = None) -> list[str>`
- `seasons_for(title: str) -> list[str]`
- `episodes_for(title: str, season: str) -> list[str]`
## Wichtige optionale Methoden
- `capabilities()`
- `genres()`
- `popular_series()`
- `latest_episodes(page: int = 1)`
- `titles_for_genre(genre: str)`
- `titles_for_genre_page(genre: str, page: int)`
- `titles_for_genre_group_page(...)` / `genre_has_more(...)` (Paging / Alphabet-Gruppen)
- `stream_link_for(...)`
- `resolve_stream_link(...)`
- `metadata_for(...)`
- `stream_link_for_url(...)`
- `available_hosters_for(...)`
- `available_hosters_for_url(...)`
- `episode_url_for(...)`
- `series_url_for_title(...)`
- `remember_series_url(...)`
- `episode_url_for(...)`
- `available_hosters_for_url(...)`
- `stream_link_for_url(...)`
- `metadata_for(...)`
## Film Provider Standard
Wenn keine echten Staffeln existieren:
@@ -47,6 +53,12 @@ Aktuelle Regeln fuer Suchtreffer:
- Keine Teilwort Treffer im selben Wort
- Beschreibungen nicht fuer Match nutzen
Siehe als Referenz:
- `addon/plugins/_template_plugin.py` (Minimal-Template)
- `addon/plugins/serienstream_plugin.py`
- `addon/plugins/aniworld_plugin.py`
- `addon/plugins/topstreamfilm_plugin.py`
## Settings
Pro Plugin meist `*_base_url`.
Beispiele:

View File

@@ -1,17 +1,21 @@
# Release Flow (Main + Nightly)
# Release Flow (Main + Nightly + Dev)
This project uses two release channels:
This project uses three release channels:
- `dev`: playground for experiments
- `nightly`: integration and test channel
- `main`: stable channel
## Rules
- Feature work goes to `nightly` only.
- Experimental work goes to `dev`.
- Feature work for release goes to `nightly`.
- Promote from `nightly` to `main` with `--squash` only.
- `main` version has no suffix (`0.1.60`).
- `nightly` version uses `-nightly` and is always at least one patch higher than `main` (`0.1.61-nightly`).
- `dev` version uses `-dev` (`0.1.62-dev`).
- Keep changelogs split:
- `CHANGELOG-DEV.md`
- `CHANGELOG-NIGHTLY.md`
- `CHANGELOG.md`
@@ -40,5 +44,6 @@ Then:
## Local ZIPs (separated)
- Dev ZIP output: `dist/local_zips/dev/`
- Main ZIP output: `dist/local_zips/main/`
- Nightly ZIP output: `dist/local_zips/nightly/`

View File

@@ -18,3 +18,16 @@ omit = [
[tool.coverage.report]
show_missing = true
skip_empty = true
[tool.ruff]
line-length = 120
target-version = "py311"
extend-exclude = ["dist", ".venv"]
[tool.ruff.lint]
select = ["E", "F", "W"]
ignore = ["E501"]
[tool.black]
line-length = 120
target-version = ["py311"]

View File

@@ -1,11 +1,16 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="repository.viewit" name="ViewIT Repository" version="1.0.0" provider-name="ViewIT">
<addon id="repository.viewit" name="ViewIT Repository" version="1.0.1" provider-name="ViewIT">
<extension point="xbmc.addon.repository" name="ViewIT Repository">
<dir>
<info compressed="false">http://127.0.0.1:8080/repo/addons.xml</info>
<checksum>http://127.0.0.1:8080/repo/addons.xml.md5</checksum>
<datadir zip="true">http://127.0.0.1:8080/repo/</datadir>
</dir>
<dir>
<info compressed="false">https://raw.githubusercontent.com/Gujal00/smrzips/master/addons.xml</info>
<checksum>https://raw.githubusercontent.com/Gujal00/smrzips/master/addons.xml.md5</checksum>
<datadir zip="true">https://raw.githubusercontent.com/Gujal00/smrzips/master/zips/</datadir>
</dir>
</extension>
<extension point="xbmc.addon.metadata">
<summary lang="de_DE">Lokales Repository fuer ViewIT Updates</summary>

View File

@@ -1,2 +1,4 @@
pytest>=9,<10
pytest-cov>=5,<8
ruff>=0.8,<0.9
black>=24.0,<25.0

View File

@@ -39,4 +39,18 @@ else
find "${DEST_DIR}" -type f -name '*.pyc' -delete || true
fi
# Auch nach ~/.kodi/addons/ deployen wenn vorhanden
KODI_ADDON_DIR="${HOME}/.kodi/addons/${ADDON_ID}"
if [[ -d "${HOME}/.kodi/addons" ]]; then
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete \
--exclude '__pycache__/' \
--exclude '*.pyc' \
"${DEST_DIR}/" "${KODI_ADDON_DIR}/"
else
rm -rf "${KODI_ADDON_DIR}"
cp -a "${DEST_DIR}" "${KODI_ADDON_DIR}"
fi
fi
echo "${DEST_DIR}"

View File

@@ -118,6 +118,8 @@ md5 = hashlib.md5(addons_xml.read_bytes()).hexdigest()
md5_file.write_text(md5, encoding="ascii")
PY
python3 "${ROOT_DIR}/scripts/verify_repo_artifacts.py" "${REPO_DIR}" >/dev/null
echo "Repo built:"
echo " ${REPO_DIR}/addons.xml"
echo " ${REPO_DIR}/addons.xml.md5"

33
scripts/hooks/commit-msg Executable file
View File

@@ -0,0 +1,33 @@
#!/bin/bash
# commit-msg: Version in Commit-Message aktualisieren und Changelog-Eintrag prependen (nur dev-Branch)
branch=$(git symbolic-ref --short HEAD 2>/dev/null)
[[ "$branch" != "dev" ]] && exit 0
root=$(git rev-parse --show-toplevel)
cd "$root"
# Aktuelle Version aus addon.xml (bereits vom pre-commit Hook hochgezaehlt)
version=$(grep -oP 'version="\K[0-9]+\.[0-9]+\.[0-9]+(\.[0-9]+)?[^"]*' addon/addon.xml | head -1)
# Commit-Message: alte Versionsnummern durch aktuelle ersetzen
msg=$(cat "$1")
updated_msg=$(echo "$msg" | sed -E "s/bump to [0-9]+\.[0-9]+\.[0-9]+(\.[0-9]+)?[^ ]*/bump to ${version}/g")
echo "$updated_msg" > "$1"
today=$(date +%Y-%m-%d)
# Changelog-Eintrag aufbauen
{
echo "## ${version} - ${today}"
echo ""
while IFS= read -r line; do
[[ -z "$line" ]] && continue
echo "- ${line}"
done <<< "$updated_msg"
echo ""
cat CHANGELOG-DEV.md
} > /tmp/changelog_new.md
mv /tmp/changelog_new.md CHANGELOG-DEV.md
git add CHANGELOG-DEV.md

24
scripts/hooks/post-commit Executable file
View File

@@ -0,0 +1,24 @@
#!/bin/bash
# post-commit: ZIP bauen, pushen, Gitea-Release veröffentlichen (nur dev-Branch)
branch=$(git symbolic-ref --short HEAD 2>/dev/null)
[[ "$branch" != "dev" ]] && exit 0
root=$(git rev-parse --show-toplevel)
cd "$root"
# ZIP bauen
echo "[hook] Baue ZIP..."
bash scripts/build_kodi_zip.sh
# Push
echo "[hook] Push origin dev..."
git push origin dev
# Gitea Release
if [[ -n "$GITEA_TOKEN" ]]; then
echo "[hook] Veröffentliche Gitea-Release..."
bash scripts/publish_gitea_release.sh
else
echo "[hook] GITEA_TOKEN nicht gesetzt Gitea-Release übersprungen"
fi

54
scripts/hooks/pre-commit Executable file
View File

@@ -0,0 +1,54 @@
#!/bin/bash
# pre-commit: Patch-Version in addon.xml automatisch hochzählen (nur dev-Branch)
# Unterstuetzt 3-teilig (x.x.xx) und 4-teilig (x.x.xx.x)
# 4-teilig: zaehlt den 4. Teil in 5er-Schritten hoch (z.B. 0.1.75.5 → 0.1.76.0)
branch=$(git symbolic-ref --short HEAD 2>/dev/null)
[[ "$branch" != "dev" ]] && exit 0
root=$(git rev-parse --show-toplevel)
cd "$root"
# Version aus addon.xml lesen (3- oder 4-teilig mit optionalem Suffix)
current=$(grep -oP 'version="\K[0-9]+\.[0-9]+\.[0-9]+(\.[0-9]+)?[^"]*' addon/addon.xml | head -1)
if [[ -z "$current" ]]; then
echo "[hook] Fehler: Version nicht gefunden in addon/addon.xml" >&2
exit 1
fi
# Suffix extrahieren (z.B. -dev)
suffix=$(echo "$current" | grep -oP '[-][a-zA-Z].*' || true)
version_only=$(echo "$current" | sed "s/${suffix}$//")
# Parts zaehlen
IFS='.' read -ra parts <<< "$version_only"
num_parts=${#parts[@]}
if [[ $num_parts -eq 4 ]]; then
# 4-teilig: 4. Teil um 5 erhoehen, bei >= 10 den 3. Teil erhoehen
major=${parts[0]}
minor=${parts[1]}
patch=${parts[2]}
sub=${parts[3]}
new_sub=$((sub + 5))
if [[ $new_sub -ge 10 ]]; then
new_sub=$((new_sub - 10))
patch=$((patch + 1))
fi
new_version="${major}.${minor}.${patch}.${new_sub}${suffix}"
elif [[ $num_parts -eq 3 ]]; then
# 3-teilig: Patch hochzaehlen
major=${parts[0]}
minor=${parts[1]}
patch=${parts[2]}
new_version="${major}.${minor}.$((patch + 1))${suffix}"
else
echo "[hook] Fehler: Unerwartetes Versionsformat: $current" >&2
exit 1
fi
# addon.xml aktualisieren
sed -i "s/version=\"${current}\"/version=\"${new_version}\"/" addon/addon.xml
git add addon/addon.xml
echo "[hook] Version: $current → $new_version"

14
scripts/install_hooks.sh Normal file
View File

@@ -0,0 +1,14 @@
#!/bin/bash
# Installiert Git Hooks für das Dev-Workflow als Symlinks
root=$(git rev-parse --show-toplevel)
hooks_src="$root/scripts/hooks"
hooks_dst="$root/.git/hooks"
for hook in pre-commit commit-msg post-commit; do
chmod +x "$hooks_src/$hook"
ln -sf "$hooks_src/$hook" "$hooks_dst/$hook"
echo "Installiert: $hook"
done
echo "Alle Hooks aktiv."

View File

@@ -128,14 +128,27 @@ print(json.load(open(sys.argv[1], encoding="utf-8"))["id"])
PY
)"
elif [[ "${http_code}" == "404" ]]; then
payload="$(python3 - "${TAG}" "${TITLE}" "${NOTES}" <<'PY'
# Branch und prerelease aus Version ableiten
read -r TARGET_BRANCH IS_PRERELEASE < <(python3 - "${ADDON_VERSION}" <<'PY'
import sys
v = sys.argv[1]
if "-dev" in v:
print("dev", "true")
elif "-nightly" in v:
print("nightly", "true")
else:
print("main", "false")
PY
)
payload="$(python3 - "${TAG}" "${TITLE}" "${NOTES}" "${TARGET_BRANCH}" "${IS_PRERELEASE}" <<'PY'
import json,sys
print(json.dumps({
"tag_name": sys.argv[1],
"name": sys.argv[2],
"body": sys.argv[3],
"target_commitish": sys.argv[4],
"draft": False,
"prerelease": False
"prerelease": sys.argv[5] == "true"
}))
PY
)"

147
scripts/verify_repo_artifacts.py Executable file
View File

@@ -0,0 +1,147 @@
#!/usr/bin/env python3
"""Validate Kodi repository artifacts for ViewIT.
Usage:
verify_repo_artifacts.py <repo_dir> [--expect-branch <branch>]
"""
from __future__ import annotations
import argparse
import hashlib
import sys
import xml.etree.ElementTree as ET
import zipfile
from pathlib import Path
PLUGIN_ID = "plugin.video.viewit"
REPO_ID = "repository.viewit"
def _find_addon(root: ET.Element, addon_id: str) -> ET.Element:
if root.tag == "addon" and (root.attrib.get("id") or "") == addon_id:
return root
for addon in root.findall("addon"):
if (addon.attrib.get("id") or "") == addon_id:
return addon
raise ValueError(f"addon {addon_id} not found in addons.xml")
def _read_zip_addon_version(zip_path: Path, addon_id: str) -> str:
inner_path = f"{addon_id}/addon.xml"
with zipfile.ZipFile(zip_path, "r") as archive:
try:
data = archive.read(inner_path)
except KeyError as exc:
raise ValueError(f"{zip_path.name}: missing {inner_path}") from exc
root = ET.fromstring(data.decode("utf-8", errors="replace"))
version = (root.attrib.get("version") or "").strip()
if not version:
raise ValueError(f"{zip_path.name}: addon.xml without version")
return version
def _check_md5(repo_dir: Path) -> list[str]:
errors: list[str] = []
addons_xml = repo_dir / "addons.xml"
md5_file = repo_dir / "addons.xml.md5"
if not addons_xml.exists() or not md5_file.exists():
return errors
expected = md5_file.read_text(encoding="ascii", errors="ignore").strip().lower()
actual = hashlib.md5(addons_xml.read_bytes()).hexdigest()
if expected != actual:
errors.append("addons.xml.md5 does not match addons.xml")
return errors
def _check_repo_zip_branch(zip_path: Path, expected_branch: str) -> list[str]:
errors: list[str] = []
inner_path = f"{REPO_ID}/addon.xml"
with zipfile.ZipFile(zip_path, "r") as archive:
try:
data = archive.read(inner_path)
except KeyError as exc:
raise ValueError(f"{zip_path.name}: missing {inner_path}") from exc
root = ET.fromstring(data.decode("utf-8", errors="replace"))
info = root.find(".//dir/info")
if info is None or not (info.text or "").strip():
errors.append(f"{zip_path.name}: missing repository info URL")
return errors
info_url = (info.text or "").strip()
marker = f"/branch/{expected_branch}/addons.xml"
if marker not in info_url:
errors.append(f"{zip_path.name}: info URL does not point to branch '{expected_branch}'")
return errors
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("repo_dir", help="Path to repository root (contains addons.xml)")
parser.add_argument("--expect-branch", default="", help="Expected branch in repository.viewit addon.xml URL")
args = parser.parse_args()
repo_dir = Path(args.repo_dir).resolve()
addons_xml = repo_dir / "addons.xml"
if not addons_xml.exists():
print(f"Missing: {addons_xml}", file=sys.stderr)
return 2
errors: list[str] = []
try:
root = ET.parse(addons_xml).getroot()
plugin_node = _find_addon(root, PLUGIN_ID)
repo_node = _find_addon(root, REPO_ID)
except Exception as exc:
print(f"Invalid addons.xml: {exc}", file=sys.stderr)
return 2
plugin_version = (plugin_node.attrib.get("version") or "").strip()
repo_version = (repo_node.attrib.get("version") or "").strip()
if not plugin_version:
errors.append("plugin.video.viewit has no version in addons.xml")
if not repo_version:
errors.append("repository.viewit has no version in addons.xml")
plugin_zip = repo_dir / PLUGIN_ID / f"{PLUGIN_ID}-{plugin_version}.zip"
repo_zip = repo_dir / REPO_ID / f"{REPO_ID}-{repo_version}.zip"
if not plugin_zip.exists():
errors.append(f"Missing plugin zip: {plugin_zip}")
if not repo_zip.exists():
errors.append(f"Missing repository zip: {repo_zip}")
if plugin_zip.exists():
try:
zip_version = _read_zip_addon_version(plugin_zip, PLUGIN_ID)
if zip_version != plugin_version:
errors.append(
f"{plugin_zip.name}: version mismatch (zip={zip_version}, addons.xml={plugin_version})"
)
except Exception as exc:
errors.append(str(exc))
if repo_zip.exists():
try:
zip_version = _read_zip_addon_version(repo_zip, REPO_ID)
if zip_version != repo_version:
errors.append(f"{repo_zip.name}: version mismatch (zip={zip_version}, addons.xml={repo_version})")
if args.expect_branch:
errors.extend(_check_repo_zip_branch(repo_zip, args.expect_branch))
except Exception as exc:
errors.append(str(exc))
errors.extend(_check_md5(repo_dir))
if errors:
print("Repository validation failed:")
for line in errors:
print(f"- {line}")
return 1
print("Repository validation passed.")
print(f"- plugin: {plugin_version}")
print(f"- repository: {repo_version}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

3
tests/README_LOCAL.md Normal file
View File

@@ -0,0 +1,3 @@
Diese Tests sind lokal (nicht committen). Ausführen mit:
pytest -q

10
tests/conftest.py Normal file
View File

@@ -0,0 +1,10 @@
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
ADDON = ROOT / "addon"
for path in (ROOT, ADDON):
value = str(path)
if value not in sys.path:
sys.path.insert(0, value)

View File

@@ -0,0 +1,30 @@
import re
import shutil
import subprocess
import pytest
@pytest.mark.live
def test_dokustreams_embed_url_via_curl():
if shutil.which('curl') is None:
pytest.skip('curl not available')
url = 'https://doku-streams.com/verbrechen/deutsche-im-knast-japan-und-die-disziplin/'
result = subprocess.run(
['curl', '-L', '-s', '--compressed', url],
check=False,
capture_output=True,
text=False,
)
assert result.returncode == 0
html = result.stdout.decode('utf-8', errors='ignore')
assert html
iframe_match = re.search(r'<iframe[^>]+src="([^"]+)"', html, re.IGNORECASE)
if iframe_match is None:
iframe_match = re.search(r'"embedUrl"\s*:\s*"([^"]+)"', html)
assert iframe_match is not None
src = iframe_match.group(1)
assert 'youtube' in src or 'vimeo' in src

View File

@@ -0,0 +1,41 @@
from bs4 import BeautifulSoup
from addon.plugins import filmpalast_plugin as fp
def _soup(html: str):
return BeautifulSoup(html, "html.parser")
def test_genres_parse_sidebar(monkeypatch):
html = """
<aside>
<section id="genre">
<ul>
<li><a href="https://filmpalast.to/search/genre/Action">Action</a></li>
<li><a href="https://filmpalast.to/search/genre/Drama">Drama</a></li>
</ul>
</section>
</aside>
"""
monkeypatch.setattr(fp, "_get_soup", lambda *args, **kwargs: _soup(html))
plugin = fp.FilmpalastPlugin()
genres = plugin.genres()
assert genres == ["Action", "Drama"]
def test_titles_for_genre_page_parsing(monkeypatch):
html = """
<article class="liste"><h2><a href="//filmpalast.to/stream/test-film">Test Film</a></h2></article>
<article class="liste"><h2><a href="//filmpalast.to/stream/test-show-s01e01">Test Show S01E01 Pilot</a></h2></article>
<article class="liste"><h2><a href="//filmpalast.to/stream/test-show-s01e02">Test Show S01E02 Folge 2</a></h2></article>
"""
plugin = fp.FilmpalastPlugin()
plugin._genre_to_url = {"Action": "https://filmpalast.to/search/genre/Action"}
monkeypatch.setattr(fp, "_get_soup", lambda *args, **kwargs: _soup(html))
titles = plugin.titles_for_genre_page("Action", 1)
assert titles == ["Test Film", "Test Show"]
assert plugin.seasons_for("Test Show") == ["Staffel 1"]

View File

@@ -0,0 +1,45 @@
import asyncio
from addon.plugins.filmpalast_plugin import FilmpalastPlugin, SearchHit
def _fake_hits(_query: str) -> list[SearchHit]:
return [
SearchHit(title="Star Trek S01E01 Pilot", url="https://filmpalast.to/stream/star-trek-s01e01"),
SearchHit(title="Star Trek S01E02 Zweiter Kontakt", url="https://filmpalast.to/stream/star-trek-s01e02"),
SearchHit(title="Ein Hund namens Palma", url="https://filmpalast.to/stream/ein-hund-namens-palma"),
]
def test_search_groups_series_and_movies(monkeypatch):
plugin = FilmpalastPlugin()
monkeypatch.setattr(plugin, "_search_hits", _fake_hits)
titles = asyncio.run(plugin.search_titles("trek"))
assert titles == ["Ein Hund namens Palma", "Star Trek"]
def test_series_seasons_and_episodes(monkeypatch):
plugin = FilmpalastPlugin()
monkeypatch.setattr(plugin, "_search_hits", _fake_hits)
asyncio.run(plugin.search_titles("trek"))
assert plugin.is_movie("Star Trek") is False
assert plugin.seasons_for("Star Trek") == ["Staffel 1"]
assert plugin.episodes_for("Star Trek", "Staffel 1") == [
"Episode 1 - Pilot",
"Episode 2 - Zweiter Kontakt",
]
def test_movie_path_stays_unchanged(monkeypatch):
plugin = FilmpalastPlugin()
monkeypatch.setattr(plugin, "_search_hits", _fake_hits)
asyncio.run(plugin.search_titles("hund"))
assert plugin.is_movie("Ein Hund namens Palma") is True
assert plugin.seasons_for("Ein Hund namens Palma") == ["Film"]
assert plugin.episodes_for("Ein Hund namens Palma", "Film") == ["Stream"]

717
tests/test_moflix_plugin.py Normal file
View File

@@ -0,0 +1,717 @@
"""Tests für das Moflix-Stream-Plugin.
Mockt _get_json() auf Instance-Ebene um reale HTTP-Requests zu vermeiden.
Testet u.a. den Cross-Invocation-Cache-Miss-Bug (leere Instanz ohne Vorsuche).
"""
import asyncio
from addon.plugins.moflix_plugin import MoflixPlugin, GENRE_SLUGS, COLLECTION_SLUGS, _unpack_packer
# ---------------------------------------------------------------------------
# JSON-Fixtures (realistische Moflix-API-Antworten)
# ---------------------------------------------------------------------------
SEARCH_RESPONSE = {
"results": [
{
"id": "123",
"name": "Breaking Bad",
"is_series": True,
"description": "Chemie-Lehrer wird Drogenboss.",
"poster": "https://cdn.example.com/bb.jpg",
"backdrop": "https://cdn.example.com/bb-bg.jpg",
"model_type": "title",
},
{
"id": "456",
"name": "Inception",
"is_series": False,
"description": "Ein Traum im Traum.",
"poster": "https://cdn.example.com/inc.jpg",
"backdrop": "https://cdn.example.com/inc-bg.jpg",
"model_type": "title",
},
# Personen-Eintrag soll übersprungen werden
{"id": "789", "name": "Christopher Nolan", "model_type": "person"},
]
}
TITLE_RESPONSE_SERIES = {
"title": {
"id": "123",
"name": "Breaking Bad",
"description": "Chemie-Lehrer wird Drogenboss.",
"poster": "https://cdn.example.com/bb.jpg",
"backdrop": "https://cdn.example.com/bb-bg.jpg",
"rating": 9.5,
"release_date": "2008-01-20",
},
"seasons": {
"data": [
{"number": 2, "title_id": "1002"}, # absichtlich unsortiert
{"number": 1, "title_id": "1001"},
]
},
}
TITLE_RESPONSE_MOVIE = {
"title": {
"id": "456",
"name": "Inception",
"description": "Ein Traum im Traum.",
"poster": "https://cdn.example.com/inc.jpg",
"backdrop": "https://cdn.example.com/inc-bg.jpg",
"rating": 8.8,
"release_date": "2010-07-15",
"videos": [
# gupload.xyz wird übersprungen (_VIDEO_SKIP_DOMAINS)
{"quality": "1080p", "src": "https://gupload.xyz/data/e/deadbeef", "name": "Mirror 1"},
# vidara.to wird bevorzugt
{"quality": "1080p", "src": "https://vidara.to/e/inc7testXYZ", "name": "Mirror 2"},
],
},
"seasons": {"data": []},
}
EPISODES_RESPONSE = {
"pagination": {
"data": [
{"episode_number": 1, "name": "Pilot", "primary_video": {"id": 1}},
{"episode_number": 2, "name": "Cat's in the Bag", "primary_video": {"id": 2}},
# primary_video=None → überspringen
{"episode_number": 3, "name": "Kein Video", "primary_video": None},
]
}
}
# Episoden-Detail-Response (für stream_link_for, enthält videos[] mit src-URLs)
EPISODE_DETAIL_RESPONSE = {
"episode": {
"videos": [
# gupload.xyz wird übersprungen
{"quality": "1080p", "src": "https://gupload.xyz/data/e/ep1hash", "name": "Mirror 1"},
# vidara.to wird bevorzugt → dieser src wird zurückgegeben
{"quality": "1080p", "src": "https://vidara.to/e/ep1vidara", "name": "Mirror 2"},
# YouTube → immer überspringen
{"quality": None, "src": "https://youtube.com/watch?v=abc", "name": "Trailer"},
]
}
}
VIDARA_STREAM_RESPONSE = {
"filecode": "ep1vidara",
"streaming_url": "https://cdn.example.com/hls/ep1/master.m3u8",
"subtitles": None,
"thumbnail": "https://cdn.example.com/thumb.jpg",
"title": "",
}
# Minimales HTML mit p.a.c.k.e.r.-obfuskiertem JS (VidHide-Format).
# Packed-String kodiert:
# var links={"hls2":"https://cdn.example.com/hls/test/master.m3u8"};
# jwplayer("vplayer").setup({sources:[{file:links.hls2,type:"hls"}]});
# mit base=36 und keywords: var|links|hls2|jwplayer|vplayer|setup|sources|file|type
VIDHIDE_HTML = (
"<html><body><script>"
"eval(function(p,a,c,k,e,d){"
"e=function(c){return c.toString(36)};"
"if(!''.replace(/^/,String)){while(c--){d[c.toString(a)]=k[c]||c.toString(a)}"
"k=[function(e){return d[e]}];e=function(){return'\\\\w+'};c=1};"
"while(c--){if(k[c]){p=p.replace(new RegExp('\\\\b'+e(c)+'\\\\b','g'),k[c])}};"
"return p}"
"('0 1={\"2\":\"https://cdn.example.com/hls/test/master.m3u8\"};3(\"4\").5({6:[{7:1.2,8:\"hls\"}]});',"
"36,9,'var|links|hls2|jwplayer|vplayer|setup|sources|file|type'.split('|'),0,0))"
"</script></body></html>"
)
CHANNEL_RESPONSE = {
"channel": {
"content": {
"data": [
{
"id": "100",
"name": "Squid Game",
"is_series": True,
"description": "Spiele.",
"poster": "https://cdn.example.com/sq.jpg",
"backdrop": "",
},
{
"id": "200",
"name": "The Crown",
"is_series": True,
"description": "",
"poster": "",
"backdrop": "",
},
]
}
}
}
# ---------------------------------------------------------------------------
# Hilfsfunktion: URL-basiertes Mock-Routing
# ---------------------------------------------------------------------------
def make_json_router(**routes):
"""Erzeugt eine _get_json-Mock, die URL-abhängig antwortet.
Schlüssel = Substring der URL, Wert = zurückzugebende JSON-Daten.
Reihenfolge: spezifischere Schlüssel zuerst übergeben (dict-Reihenfolge).
"""
def _router(url, headers=None):
for key, response in routes.items():
if key in url:
return response
return None
return _router
# ---------------------------------------------------------------------------
# Tests: search_titles
# ---------------------------------------------------------------------------
def test_search_titles_returns_names(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE)
titles = asyncio.run(plugin.search_titles("breaking"))
assert "Breaking Bad" in titles
assert "Inception" in titles
# Person-Eintrag darf nicht auftauchen
assert "Christopher Nolan" not in titles
def test_search_populates_cache(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE)
asyncio.run(plugin.search_titles("breaking"))
# URL-Cache
assert "Breaking Bad" in plugin._title_to_url
assert "/api/v1/titles/123" in plugin._title_to_url["Breaking Bad"]
# is_series-Cache
assert plugin._is_series["Breaking Bad"] is True
assert plugin._is_series["Inception"] is False
# Metadaten-Cache
assert plugin._title_meta["Breaking Bad"][0] == "Chemie-Lehrer wird Drogenboss."
assert plugin._title_meta["Inception"][1] == "https://cdn.example.com/inc.jpg"
def test_search_empty_query_returns_empty():
plugin = MoflixPlugin()
titles = asyncio.run(plugin.search_titles(""))
assert titles == []
# ---------------------------------------------------------------------------
# Tests: seasons_for
# ---------------------------------------------------------------------------
def test_seasons_for_series_after_search(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", make_json_router(
search=SEARCH_RESPONSE,
titles=TITLE_RESPONSE_SERIES,
))
asyncio.run(plugin.search_titles("breaking"))
seasons = plugin.seasons_for("Breaking Bad")
# Staffeln korrekt sortiert
assert seasons == ["Staffel 1", "Staffel 2"]
def test_seasons_for_film_returns_film(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE)
asyncio.run(plugin.search_titles("inception"))
seasons = plugin.seasons_for("Inception")
assert seasons == ["Film"]
def test_seasons_for_caches_season_api_ids(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", make_json_router(
search=SEARCH_RESPONSE,
titles=TITLE_RESPONSE_SERIES,
))
asyncio.run(plugin.search_titles("breaking"))
plugin.seasons_for("Breaking Bad")
assert plugin._season_api_ids[("Breaking Bad", 1)] == "1001"
assert plugin._season_api_ids[("Breaking Bad", 2)] == "1002"
def test_seasons_for_cache_miss_triggers_resolve(monkeypatch):
"""Bug-Regression: seasons_for() ohne Vorsuche (leere Instanz = Kodi-Neuaufruf).
_resolve_title() muss automatisch eine Suche starten und den Cache befüllen.
"""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", make_json_router(
search=SEARCH_RESPONSE,
titles=TITLE_RESPONSE_SERIES,
))
# KEIN asyncio.run(search_titles(...)) simuliert leere Instanz
seasons = plugin.seasons_for("Breaking Bad")
assert seasons == ["Staffel 1", "Staffel 2"]
def test_seasons_for_unknown_title_returns_empty(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: {"results": []})
seasons = plugin.seasons_for("Unbekannter Titel XYZ")
assert seasons == []
# ---------------------------------------------------------------------------
# Tests: episodes_for
# ---------------------------------------------------------------------------
def test_episodes_for_series(monkeypatch):
plugin = MoflixPlugin()
# "/titles/123" matcht nur die Titel-Detail-URL (id=123), nicht die Episoden-URL (id=1001)
monkeypatch.setattr(plugin, "_get_json", make_json_router(
**{"search": SEARCH_RESPONSE, "/titles/123": TITLE_RESPONSE_SERIES, "episodes": EPISODES_RESPONSE}
))
asyncio.run(plugin.search_titles("breaking"))
plugin.seasons_for("Breaking Bad")
episodes = plugin.episodes_for("Breaking Bad", "Staffel 1")
assert episodes == ["Episode 1 Pilot", "Episode 2 Cat's in the Bag"]
# Episode ohne primary_video (Nr. 3) darf nicht enthalten sein
assert len(episodes) == 2
def test_episodes_for_film_returns_title():
plugin = MoflixPlugin()
result = plugin.episodes_for("Inception", "Film")
assert result == ["Inception"]
def test_episodes_cache_hit(monkeypatch):
"""Zweiter episodes_for()-Aufruf darf keine neuen _get_json-Calls auslösen."""
call_count = {"n": 0}
def counting_router(url, headers=None):
call_count["n"] += 1
return make_json_router(
search=SEARCH_RESPONSE,
titles=TITLE_RESPONSE_SERIES,
episodes=EPISODES_RESPONSE,
)(url)
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", counting_router)
asyncio.run(plugin.search_titles("breaking"))
plugin.seasons_for("Breaking Bad")
plugin.episodes_for("Breaking Bad", "Staffel 1")
calls_after_first = call_count["n"]
# Zweiter Aufruf kein neuer HTTP-Call
plugin.episodes_for("Breaking Bad", "Staffel 1")
assert call_count["n"] == calls_after_first
# ---------------------------------------------------------------------------
# Tests: stream_link_for
# ---------------------------------------------------------------------------
def test_stream_link_for_episode_returns_vidara_src(monkeypatch):
"""stream_link_for() für Episode gibt vidara.to-URL aus episode.videos[] zurück."""
plugin = MoflixPlugin()
# Reihenfolge: spezifischere Keys zuerst
# "episodes/1" matcht die Detail-URL .../episodes/1?...
# "episodes" matcht die Listen-URL .../episodes?...
monkeypatch.setattr(plugin, "_get_json", make_json_router(
**{
"search": SEARCH_RESPONSE,
"/titles/123": TITLE_RESPONSE_SERIES,
"episodes/1": EPISODE_DETAIL_RESPONSE,
"episodes": EPISODES_RESPONSE,
}
))
asyncio.run(plugin.search_titles("breaking"))
plugin.seasons_for("Breaking Bad")
plugin.episodes_for("Breaking Bad", "Staffel 1")
link = plugin.stream_link_for("Breaking Bad", "Staffel 1", "Episode 1 Pilot")
# gupload.xyz wird übersprungen, vidara.to bevorzugt
assert link == "https://vidara.to/e/ep1vidara"
def test_stream_link_for_episode_cache_miss(monkeypatch):
"""stream_link_for() funktioniert auch ohne Vorsuche (leere Instanz)."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", make_json_router(
**{
"search": SEARCH_RESPONSE,
"/titles/123": TITLE_RESPONSE_SERIES,
"episodes/1": EPISODE_DETAIL_RESPONSE,
"episodes": EPISODES_RESPONSE,
}
))
link = plugin.stream_link_for("Breaking Bad", "Staffel 1", "Episode 1 Pilot")
assert link == "https://vidara.to/e/ep1vidara"
def test_stream_link_for_movie(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", make_json_router(
search=SEARCH_RESPONSE,
titles=TITLE_RESPONSE_MOVIE,
))
asyncio.run(plugin.search_titles("inception"))
link = plugin.stream_link_for("Inception", "Film", "Inception")
# gupload.xyz übersprungen, vidara.to bevorzugt
assert link == "https://vidara.to/e/inc7testXYZ"
def test_stream_link_for_movie_cache_miss(monkeypatch):
"""Film-Stream auch ohne Vorsuche (leere Instanz via _resolve_title)."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", make_json_router(
search=SEARCH_RESPONSE,
titles=TITLE_RESPONSE_MOVIE,
))
link = plugin.stream_link_for("Inception", "Film", "Inception")
assert link == "https://vidara.to/e/inc7testXYZ"
# ---------------------------------------------------------------------------
# Tests: _hosters_from_videos
# ---------------------------------------------------------------------------
def test_hosters_skips_gupload():
plugin = MoflixPlugin()
videos = [
{"src": "https://gupload.xyz/data/e/hash", "name": "GUpload"},
{"src": "https://moflix-stream.link/e/abc", "name": "Mirror-HDCloud"},
]
hosters = plugin._hosters_from_videos(videos)
assert "https://gupload.xyz/data/e/hash" not in hosters.values()
assert "https://moflix-stream.link/e/abc" in hosters.values()
def test_hosters_skips_youtube():
plugin = MoflixPlugin()
videos = [
{"src": "https://youtube.com/watch?v=xyz", "name": "YouTube"},
{"src": "https://vidara.to/e/real123", "name": "Vidara"},
]
hosters = plugin._hosters_from_videos(videos)
assert len(hosters) == 1
assert "https://vidara.to/e/real123" in hosters.values()
def test_hosters_all_skipped_returns_empty():
plugin = MoflixPlugin()
videos = [
{"src": "https://gupload.xyz/data/e/hash"},
{"src": "https://youtube.com/watch?v=xyz"},
]
assert plugin._hosters_from_videos(videos) == {}
def test_hosters_empty_returns_empty():
plugin = MoflixPlugin()
assert plugin._hosters_from_videos([]) == {}
def test_available_hosters_for_returns_names():
plugin = MoflixPlugin()
videos = [
{"src": "https://vidara.to/e/xyz", "name": "Vidara-720"},
{"src": "https://moflix-stream.click/e/abc", "name": "Mirror-HDCloud"},
]
# Mock _videos_for um direkt zu testen
plugin._videos_for = lambda *a, **kw: videos # type: ignore[assignment]
names = plugin.available_hosters_for("Test", "Film", "Test")
assert len(names) == 2
# ---------------------------------------------------------------------------
# Tests: resolve_stream_link / _resolve_vidara
# ---------------------------------------------------------------------------
def test_resolve_stream_link_vidara_returns_hls(monkeypatch):
"""resolve_stream_link() ruft vidara.to-API auf und gibt streaming_url zurück."""
plugin = MoflixPlugin()
def mock_get_json(url, headers=None):
if "vidara.to" in url:
return VIDARA_STREAM_RESPONSE
return None
monkeypatch.setattr(plugin, "_get_json", mock_get_json)
result = plugin.resolve_stream_link("https://vidara.to/e/ep1vidara")
assert result == "https://cdn.example.com/hls/ep1/master.m3u8"
def test_resolve_stream_link_vidara_api_fails_returns_none(monkeypatch):
"""Wenn vidara-API None zurückgibt und ResolveURL nicht klappt → None."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: None)
result = plugin.resolve_stream_link("https://vidara.to/e/broken123")
# Weder vidara-API noch ResolveURL → None (kein unauflösbarer Link)
assert result is None
def test_resolve_stream_link_non_vidhide_tries_resolveurl(monkeypatch):
"""Für sonstige URLs wird ResolveURL aufgerufen; ohne Installation → None."""
plugin = MoflixPlugin()
result = plugin.resolve_stream_link("https://moflix-stream.link/e/somefilm")
# Ohne ResolveURL-Installation → None
assert result is None
# ---------------------------------------------------------------------------
# Tests: Channel-Browse (popular, genre, collection)
# ---------------------------------------------------------------------------
def test_popular_series_returns_titles(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: CHANNEL_RESPONSE)
titles = plugin.popular_series()
assert titles == ["Squid Game", "The Crown"]
# Cache muss befüllt sein
assert "Squid Game" in plugin._title_to_url
def test_channel_empty_response_returns_empty(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: None)
assert plugin.popular_series() == []
assert plugin.new_titles() == []
def test_channel_malformed_response_returns_empty(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: {"channel": {}})
assert plugin.popular_series() == []
def test_titles_for_genre(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: CHANNEL_RESPONSE)
titles = plugin.titles_for_genre("Action")
assert "Squid Game" in titles
def test_titles_for_unknown_genre_returns_empty():
plugin = MoflixPlugin()
assert plugin.titles_for_genre("Unbekanntes Genre XYZ") == []
def test_titles_for_collection(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: CHANNEL_RESPONSE)
titles = plugin.titles_for_collection("James Bond Collection")
assert "Squid Game" in titles
# ---------------------------------------------------------------------------
# Tests: genres / collections / capabilities
# ---------------------------------------------------------------------------
def test_genres_returns_sorted_list():
plugin = MoflixPlugin()
genres = plugin.genres()
assert genres == sorted(GENRE_SLUGS.keys())
assert "Action" in genres
assert "Horror" in genres
def test_collections_returns_sorted_list():
plugin = MoflixPlugin()
colls = plugin.collections()
assert colls == sorted(COLLECTION_SLUGS.keys())
assert "James Bond Collection" in colls
def test_capabilities():
plugin = MoflixPlugin()
caps = plugin.capabilities()
assert "popular_series" in caps
assert "new_titles" in caps
assert "genres" in caps
assert "collections" in caps
# ---------------------------------------------------------------------------
# Tests: metadata_for
# ---------------------------------------------------------------------------
def test_metadata_from_cache(monkeypatch):
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", lambda url, headers=None: SEARCH_RESPONSE)
asyncio.run(plugin.search_titles("breaking"))
# Metadaten-Abruf darf jetzt keinen neuen HTTP-Call auslösen
call_count = {"n": 0}
def no_call(url, headers=None):
call_count["n"] += 1
return None
monkeypatch.setattr(plugin, "_get_json", no_call)
info, art, _ = plugin.metadata_for("Breaking Bad")
assert info.get("plot") == "Chemie-Lehrer wird Drogenboss."
assert art.get("poster") == "https://cdn.example.com/bb.jpg"
assert call_count["n"] == 0 # kein HTTP-Call
def test_metadata_api_fallback(monkeypatch):
"""Metadaten werden via API geladen wenn nicht im Cache."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_json", make_json_router(
search=SEARCH_RESPONSE,
titles=TITLE_RESPONSE_SERIES,
))
asyncio.run(plugin.search_titles("breaking"))
# Cache leeren um API-Fallback zu erzwingen
plugin._title_meta.clear()
info, art, _ = plugin.metadata_for("Breaking Bad")
assert info.get("plot") == "Chemie-Lehrer wird Drogenboss."
assert "year" in info
assert info["year"] == "2008"
def test_metadata_unknown_title_returns_empty():
plugin = MoflixPlugin()
info, art, streams = plugin.metadata_for("Unbekannt")
assert info == {"title": "Unbekannt"}
assert art == {}
assert streams is None
# ---------------------------------------------------------------------------
# Tests: _unpack_packer
# ---------------------------------------------------------------------------
def test_unpack_packer_basic():
"""_unpack_packer() entpackt ein p.a.c.k.e.r.-Fragment korrekt."""
packed = (
"eval(function(p,a,c,k,e,d){return p}"
"('0 1={\"2\":\"https://cdn.example.com/hls/test/master.m3u8\"};',"
"36,3,'var|links|hls2'.split('|'),0,0))"
)
result = _unpack_packer(packed)
assert 'var links={"hls2":"https://cdn.example.com/hls/test/master.m3u8"}' in result
def test_unpack_packer_preserves_url():
"""URLs in String-Literalen werden durch den Unpacker nicht korrumpiert."""
packed = (
"eval(function(p,a,c,k,e,d){return p}"
"('0 1={\"2\":\"https://cdn.example.com/hls/test/master.m3u8\"};',"
"36,3,'var|links|hls2'.split('|'),0,0))"
)
result = _unpack_packer(packed)
assert "https://cdn.example.com/hls/test/master.m3u8" in result
def test_unpack_packer_no_match_returns_input():
"""Wenn kein p.a.c.k.e.r.-Muster gefunden wird, wird der Input unverändert zurückgegeben."""
raw = "var x = 1; console.log(x);"
assert _unpack_packer(raw) == raw
def test_unpack_packer_full_vidhide_fixture():
"""Entpackt die VIDHIDE_HTML-Fixture und findet hls2-URL."""
result = _unpack_packer(VIDHIDE_HTML)
assert '"hls2":"https://cdn.example.com/hls/test/master.m3u8"' in result
assert "jwplayer" in result
assert "links.hls2" in result
# ---------------------------------------------------------------------------
# Tests: _resolve_vidhide / resolve_stream_link (VidHide)
# ---------------------------------------------------------------------------
def test_resolve_vidhide_extracts_hls_url(monkeypatch):
"""_resolve_vidhide() gibt den hls2-Stream-Link mit Kodi-Header-Suffix zurück."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: VIDHIDE_HTML)
result = plugin._resolve_vidhide("https://moflix-stream.click/embed/kqocffe8ipcf")
assert result is not None
assert result.startswith("https://cdn.example.com/hls/test/master.m3u8|")
assert "Referer=" in result
assert "User-Agent=" in result
def test_resolve_vidhide_no_packer_returns_none(monkeypatch):
"""_resolve_vidhide() gibt None zurück wenn kein p.a.c.k.e.r. in der Seite."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: "<html>no packer here</html>")
result = plugin._resolve_vidhide("https://moflix-stream.click/embed/abc")
assert result is None
def test_resolve_vidhide_html_fetch_fails_returns_none(monkeypatch):
"""_resolve_vidhide() gibt None zurück wenn _get_html() fehlschlägt."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: None)
result = plugin._resolve_vidhide("https://moflix-stream.click/embed/abc")
assert result is None
def test_resolve_stream_link_vidhide_returns_hls(monkeypatch):
"""resolve_stream_link() ruft _resolve_vidhide() auf und gibt HLS-URL mit Header-Suffix zurück."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: VIDHIDE_HTML)
result = plugin.resolve_stream_link("https://moflix-stream.click/embed/kqocffe8ipcf")
assert result is not None
assert result.startswith("https://cdn.example.com/hls/test/master.m3u8|")
assert "Referer=" in result
assert "User-Agent=" in result
def test_resolve_stream_link_vidhide_fallback_on_failure(monkeypatch):
"""Wenn VidHide-Resolver fehlschlägt, wird None zurückgegeben (kein unauflösbarer Link)."""
plugin = MoflixPlugin()
monkeypatch.setattr(plugin, "_get_html", lambda url, headers=None, fresh_session=False: None)
result = plugin.resolve_stream_link("https://moflix-stream.click/embed/broken")
# Kein VidHide-Ergebnis → None (Kodi zeigt "Kein Stream"-Dialog)
assert result is None
# ---------------------------------------------------------------------------
# Tests: _best_src_from_videos moflix-stream.click nicht mehr übersprungen
# ---------------------------------------------------------------------------
def test_hosters_vidhide_not_skipped():
"""moflix-stream.click ist nicht mehr in _VIDEO_SKIP_DOMAINS."""
plugin = MoflixPlugin()
videos = [
{"src": "https://moflix-stream.click/embed/abc123", "name": "Mirror-VidHide"},
]
hosters = plugin._hosters_from_videos(videos)
assert "https://moflix-stream.click/embed/abc123" in hosters.values()
def test_hosters_vidara_present():
"""vidara.to wird korrekt als Hoster erkannt."""
plugin = MoflixPlugin()
videos = [
{"src": "https://moflix-stream.click/embed/abc123", "name": "Mirror-VidHide"},
{"src": "https://vidara.to/e/xyz789", "name": "Vidara-720"},
]
hosters = plugin._hosters_from_videos(videos)
assert len(hosters) == 2
assert "https://vidara.to/e/xyz789" in hosters.values()
def test_stream_link_for_movie_vidhide_only(monkeypatch):
"""Film mit nur moflix-stream.click Mirror: stream_link_for() gibt VidHide-src zurück."""
plugin = MoflixPlugin()
plugin._title_to_url["The Bluff"] = "https://moflix-stream.xyz/api/v1/titles/789?load=videos"
plugin._is_series["The Bluff"] = False
def mock_get_json(_url, _headers=None):
return {
"title": {
"videos": [
{"quality": "1080p", "src": "https://moflix-stream.click/embed/kqocffe8ipcf", "name": "Mirror 1"},
],
},
}
monkeypatch.setattr(plugin, "_get_json", mock_get_json)
link = plugin.stream_link_for("The Bluff", "Film", "The Bluff")
assert link == "https://moflix-stream.click/embed/kqocffe8ipcf"

View File

@@ -0,0 +1,29 @@
import pytest
from bs4 import BeautifulSoup
from addon.plugins.serienstream_plugin import _extract_episodes
def test_extract_episodes_skips_upcoming():
html = """
<table class='episode-table'>
<tbody>
<tr class='episode-row' onclick="window.location='https://s.to/serie/x/staffel-1/episode-1'">
<th class='episode-number-cell'>1</th>
<td><strong class='episode-title-ger'>Ep1</strong></td>
<td class='episode-watch-cell'><img alt='VOE'></td>
</tr>
<tr class='episode-row upcoming' onclick="window.location='https://s.to/serie/x/staffel-1/episode-2'">
<th class='episode-number-cell'>2</th>
<td>
<strong class='episode-title-ger'></strong>
<span class='badge badge-upcoming'>DEMNÄCHST</span>
</td>
<td class='episode-watch-cell'>— TBA —</td>
</tr>
</tbody>
</table>
"""
soup = BeautifulSoup(html, "html.parser")
episodes = _extract_episodes(soup)
assert [e.number for e in episodes] == [1]

View File

@@ -0,0 +1,48 @@
import os
import time
import pytest
from addon.plugins.serienstream_plugin import SerienstreamPlugin
@pytest.mark.live
@pytest.mark.perf
def test_live_titel_staffel_episode_timing():
if not os.getenv("LIVE_TESTS"):
pytest.skip("LIVE_TESTS not set")
title = os.getenv("LIVE_TITLE", "Star Trek: Starfleet Academy")
season = os.getenv("LIVE_SEASON", "Staffel 1")
max_title_to_season = float(os.getenv("PERF_MAX_TITLE_TO_SEASON", "6.0"))
max_season_to_episodes = float(os.getenv("PERF_MAX_SEASON_TO_EPISODES", "5.0"))
plugin = SerienstreamPlugin()
t0 = time.perf_counter()
seasons = plugin.seasons_for(title)
t1 = time.perf_counter()
assert seasons, f"Keine Staffeln für Titel gefunden: {title}"
assert season in seasons, f"Gewünschte Staffel fehlt: {season}; vorhanden: {seasons}"
episodes = plugin.episodes_for(title, season)
t2 = time.perf_counter()
assert episodes, f"Keine Episoden für {title} / {season}"
title_to_season = t1 - t0
season_to_episodes = t2 - t1
print(
f"PERF title->seasons={title_to_season:.3f}s "
f"season->episodes={season_to_episodes:.3f}s "
f"episodes={len(episodes)}"
)
assert title_to_season <= max_title_to_season, (
f"title->seasons zu langsam: {title_to_season:.3f}s > {max_title_to_season:.3f}s"
)
assert season_to_episodes <= max_season_to_episodes, (
f"season->episodes zu langsam: {season_to_episodes:.3f}s > {max_season_to_episodes:.3f}s"
)

View File

@@ -0,0 +1,239 @@
import os
import pytest
try:
from bs4 import BeautifulSoup
except Exception: # pragma: no cover - optional in local env
BeautifulSoup = None
from addon.plugins import serienstream_plugin as sp
pytestmark = pytest.mark.skipif(BeautifulSoup is None, reason="bs4 not available")
def _soup(html: str):
return BeautifulSoup(html, "html.parser")
def test_search_series_api_first(monkeypatch):
"""search_series() kombiniert API-Treffer mit Katalog-Cache (ohne Duplikate)."""
monkeypatch.setattr(sp, "_get_base_url", lambda: "https://s.to")
monkeypatch.setattr(sp, "_search_series_api", lambda q: [
sp.SeriesResult(title="Star Trek", description="", url="https://s.to/serie/star-trek"),
])
# Katalog-Cache: eine bekannte + eine neue URL
cache_items = [
sp.SeriesResult(title="Star Trek", description="", url="https://s.to/serie/star-trek"), # Duplikat
sp.SeriesResult(title="Star Trek: Academy", description="", url="https://s.to/serie/star-trek-academy"),
]
monkeypatch.setattr(sp, "_load_catalog_index_from_cache", lambda: cache_items)
results = sp.search_series("trek")
titles = [r.title for r in results]
# API-Treffer zuerst, Duplikate (gleiche URL) werden entfernt
assert titles[0] == "Star Trek"
assert "Star Trek: Academy" in titles
assert titles.count("Star Trek") == 1
def test_search_series_falls_back_to_catalog_cache(monkeypatch):
"""Wenn API und Server-Suche leer sind, wird der Katalog-Cache als Fallback genutzt."""
monkeypatch.setattr(sp, "_get_base_url", lambda: "https://s.to")
# API und Server-Suche liefern nichts
monkeypatch.setattr(sp, "_search_series_api", lambda q: [])
monkeypatch.setattr(sp, "_search_series_server", lambda q: [])
# Katalog-Cache mit Testdaten fuellen
cache_items = [
sp.SeriesResult(title="Der Hund", description="", url="https://s.to/serie/der-hund"),
sp.SeriesResult(title="Hundeleben", description="", url="https://s.to/serie/hundeleben"),
]
monkeypatch.setattr(sp, "_load_catalog_index_from_cache", lambda: cache_items)
results = sp.search_series("hund")
titles = [r.title for r in results]
# Nur Ganzwort-Treffer (nicht Hundeleben)
assert titles == ["Der Hund"]
def test_extract_season_links():
html = """
<ul class='nav list-items-nav'>
<a data-season-pill='1' href='/serie/x/staffel-1'>1</a>
<a data-season-pill='2' href='/serie/x/staffel-2'>2</a>
<a data-season-pill='1' href='/serie/x/staffel-1/episode-1'>skip</a>
</ul>
"""
seasons = sp._extract_season_links(_soup(html))
assert seasons == [(1, "https://s.to/serie/x/staffel-1"), (2, "https://s.to/serie/x/staffel-2")]
def test_extract_episodes_skips_upcoming_and_tba():
html = """
<table class='episode-table'>
<tbody>
<tr class='episode-row' onclick="window.location='https://s.to/serie/x/staffel-1/episode-1'">
<th class='episode-number-cell'>1</th>
<td><strong class='episode-title-ger'>Ep1</strong></td>
<td class='episode-watch-cell'><img alt='VOE'></td>
</tr>
<tr class='episode-row upcoming' onclick="window.location='https://s.to/serie/x/staffel-1/episode-2'">
<th class='episode-number-cell'>2</th>
<td>
<strong class='episode-title-ger'></strong>
<span class='badge badge-upcoming'>DEMNÄCHST</span>
</td>
<td class='episode-watch-cell'>— TBA —</td>
</tr>
</tbody>
</table>
"""
episodes = sp._extract_episodes(_soup(html))
assert [e.number for e in episodes] == [1]
def test_fetch_episode_hoster_names(monkeypatch):
html = """
<button class='link-box' data-provider-name='VOE' data-play-url='/redirect/voe'></button>
<button class='link-box' data-provider-name='Vidoza' data-play-url='/redirect/vidoza'></button>
"""
def fake_get_soup(url, session=None):
return _soup(html)
monkeypatch.setattr(sp, "_get_soup", fake_get_soup)
monkeypatch.setattr(sp, "_get_base_url", lambda: "https://s.to")
names = sp.fetch_episode_hoster_names("/serie/x/staffel-1/episode-1")
assert names == ["VOE", "Vidoza"]
def test_fetch_episode_stream_link_prefers_requested_hoster(monkeypatch):
html = """
<button class='link-box' data-provider-name='VOE' data-play-url='/redirect/voe'></button>
<button class='link-box' data-provider-name='Vidoza' data-play-url='/redirect/vidoza'></button>
"""
def fake_get_soup(url, session=None):
return _soup(html)
monkeypatch.setattr(sp, "_get_soup", fake_get_soup)
monkeypatch.setattr(sp, "_get_base_url", lambda: "https://s.to")
link = sp.fetch_episode_stream_link("/serie/x/staffel-1/episode-1", preferred_hosters=["vidoza"])
assert link == "https://s.to/redirect/vidoza"
def test_extract_latest_episodes():
html = """
<a class='latest-episode-row' href='/serie/x/staffel-1/episode-2'>
<span class='ep-title' title='Show X'>Show X</span>
<span class='ep-season'>S 1</span>
<span class='ep-episode'>E 2</span>
<span class='ep-time'>Heute</span>
</a>
"""
episodes = sp._extract_latest_episodes(_soup(html))
assert len(episodes) == 1
assert episodes[0].series_title == "Show X"
assert episodes[0].season == 1
assert episodes[0].episode == 2
def test_episode_url_for_uses_episode_cache(monkeypatch):
plugin = sp.SerienstreamPlugin()
info = sp.EpisodeInfo(
number=2,
title="Folge 2",
original_title="",
url="https://s.to/serie/x/staffel-1/episode-2",
)
plugin._episode_label_cache[("Show X", "Staffel 1")] = {"Episode 2: Folge 2": info}
called = {"lookup": False}
def _fail_lookup(*_args, **_kwargs):
called["lookup"] = True
return None
monkeypatch.setattr(plugin, "_lookup_episode", _fail_lookup)
url = plugin.episode_url_for("Show X", "Staffel 1", "Episode 2: Folge 2")
assert url == "https://s.to/serie/x/staffel-1/episode-2"
assert called["lookup"] is False
def test_parse_series_catalog_groups_and_entries():
html = """
<div class='background-1'><h3>Genre A</h3></div>
<ul class='series-list'>
<li class='series-item' data-search='desc a'>
<a href='/serie/a'>A</a>
</li>
</ul>
<div class='background-1'><h3>Genre B</h3></div>
<ul class='series-list'>
<li class='series-item' data-search='desc b'>
<a href='/serie/b'>B</a>
</li>
</ul>
"""
catalog = sp.parse_series_catalog(_soup(html))
assert list(catalog.keys()) == ["Genre A", "Genre B"]
assert [e.title for e in catalog["Genre A"]] == ["A"]
assert [e.title for e in catalog["Genre B"]] == ["B"]
def test_titles_for_genre_from_catalog(monkeypatch):
html = """
<div class='background-1'><h3>Drama</h3></div>
<ul class='series-list'>
<li class='series-item' data-search='desc'>
<a href='/serie/drama-1'>Drama 1</a>
</li>
</ul>
"""
monkeypatch.setattr(sp, "_get_soup_simple", lambda url: _soup(html))
monkeypatch.setattr(sp, "_get_base_url", lambda: "https://s.to")
plugin = sp.SerienstreamPlugin()
titles = plugin.titles_for_genre("Drama")
assert titles == ["Drama 1"]
def test_popular_series_parsing(monkeypatch):
html = """
<div class='mb-5'>
<h2>Meistgesehen</h2>
<a class='show-card' href='/serie/popular-1'>
<img alt='Popular 1' src='x.jpg'>
</a>
<a class='show-card' href='/serie/popular-2'>
<img alt='Popular 2' src='y.jpg'>
</a>
</div>
"""
monkeypatch.setattr(sp, "_get_soup_simple", lambda url: _soup(html))
monkeypatch.setattr(sp, "_get_base_url", lambda: "https://s.to")
plugin = sp.SerienstreamPlugin()
titles = plugin.popular_series()
assert titles == ["Popular 1", "Popular 2"]
@pytest.mark.live
def test_live_staffel_page_skips_upcoming():
if not os.getenv("LIVE_TESTS"):
pytest.skip("LIVE_TESTS not set")
url = "https://s.to/serie/star-trek-starfleet-academy/staffel-1"
soup = sp._get_soup_simple(url)
rows = soup.select("table.episode-table tbody tr.episode-row")
upcoming_rows = [row for row in rows if "upcoming" in (row.get("class") or [])]
episodes = sp._extract_episodes(soup)
assert len(episodes) == len(rows) - len(upcoming_rows)
@pytest.mark.live
def test_live_genres_and_titles():
if not os.getenv("LIVE_TESTS"):
pytest.skip("LIVE_TESTS not set")
plugin = sp.SerienstreamPlugin()
genres = plugin.genres()
assert isinstance(genres, list) and genres
sample = genres[0]
titles = plugin.titles_for_genre(sample)
assert isinstance(titles, list)

28
tests/test_smoke.py Normal file
View File

@@ -0,0 +1,28 @@
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
ADDON_DIR = ROOT / "addon"
if str(ADDON_DIR) not in sys.path:
sys.path.insert(0, str(ADDON_DIR))
def test_import_core_modules() -> None:
"""Ein einfacher Smoke-Test, der sicherstellt, dass Kernmodule importierbar sind.
Wichtig: Die Module sind so geschrieben, dass sie auch ohne Kodi-Umgebung
(ohne xbmc/xbmcgui) importiert werden koennen.
"""
import plugin_interface # noqa: F401
import plugin_helpers # noqa: F401
import http_session_pool # noqa: F401
import tmdb # noqa: F401
import metadata_utils # noqa: F401
import resolveurl_backend # noqa: F401

View File

@@ -0,0 +1,14 @@
from addon.plugins import aniworld_plugin as ap
from addon.plugins import topstreamfilm_plugin as tp
def test_aniworld_matches_whole_words_only():
assert ap._matches_query("hund", title="Der Hund")
assert not ap._matches_query("hund", title="Thunderstruck")
assert not ap._matches_query("hund", title="Hundeleben")
def test_topstream_matches_whole_words_only():
assert tp._matches_query("hund", title="Der Hund", description="")
assert not tp._matches_query("hund", title="Thunderstruck", description="")
assert not tp._matches_query("hund", title="Hundeleben", description="")