240 lines
7.4 KiB
Python
240 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Shared helpers for ViewIt plugins.
|
|
|
|
Focus:
|
|
- Kodi addon settings access (string/bool)
|
|
- Optional URL notifications
|
|
- Optional URL logging
|
|
- Optional HTML response dumps
|
|
|
|
Designed to work both in Kodi and outside Kodi (for linting/tests).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
import hashlib
|
|
import os
|
|
from typing import Optional
|
|
|
|
try: # pragma: no cover - Kodi runtime
|
|
import xbmcaddon # type: ignore[import-not-found]
|
|
import xbmcvfs # type: ignore[import-not-found]
|
|
import xbmcgui # type: ignore[import-not-found]
|
|
except ImportError: # pragma: no cover - allow importing outside Kodi
|
|
xbmcaddon = None
|
|
xbmcvfs = None
|
|
xbmcgui = None
|
|
|
|
|
|
def get_setting_string(addon_id: str, setting_id: str, *, default: str = "") -> str:
|
|
if xbmcaddon is None:
|
|
return default
|
|
try:
|
|
addon = xbmcaddon.Addon(addon_id)
|
|
getter = getattr(addon, "getSettingString", None)
|
|
if getter is not None:
|
|
return str(getter(setting_id) or "").strip()
|
|
return str(addon.getSetting(setting_id) or "").strip()
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def get_setting_bool(addon_id: str, setting_id: str, *, default: bool = False) -> bool:
|
|
if xbmcaddon is None:
|
|
return default
|
|
try:
|
|
addon = xbmcaddon.Addon(addon_id)
|
|
getter = getattr(addon, "getSettingBool", None)
|
|
if getter is not None:
|
|
return bool(getter(setting_id))
|
|
raw = addon.getSetting(setting_id)
|
|
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def get_setting_int(addon_id: str, setting_id: str, *, default: int = 0) -> int:
|
|
if xbmcaddon is None:
|
|
return default
|
|
try:
|
|
addon = xbmcaddon.Addon(addon_id)
|
|
getter = getattr(addon, "getSettingInt", None)
|
|
if getter is not None:
|
|
return int(getter(setting_id))
|
|
raw = addon.getSetting(setting_id)
|
|
return int(str(raw).strip())
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _is_logging_enabled(addon_id: str, *, global_setting_id: str, plugin_setting_id: Optional[str]) -> bool:
|
|
if not get_setting_bool(addon_id, global_setting_id, default=False):
|
|
return False
|
|
if plugin_setting_id:
|
|
return get_setting_bool(addon_id, plugin_setting_id, default=False)
|
|
return True
|
|
|
|
|
|
def notify_url(
|
|
addon_id: str,
|
|
*,
|
|
heading: str,
|
|
url: str,
|
|
enabled_setting_id: str,
|
|
plugin_setting_id: Optional[str] = None,
|
|
) -> None:
|
|
if xbmcgui is None:
|
|
return
|
|
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
|
|
return
|
|
try:
|
|
xbmcgui.Dialog().notification(heading, url, xbmcgui.NOTIFICATION_INFO, 3000)
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _profile_logs_dir(addon_id: str) -> Optional[str]:
|
|
if xbmcaddon is None or xbmcvfs is None:
|
|
return None
|
|
try:
|
|
addon = xbmcaddon.Addon(addon_id)
|
|
profile = xbmcvfs.translatePath(addon.getAddonInfo("profile"))
|
|
log_dir = os.path.join(profile, "logs")
|
|
if not xbmcvfs.exists(log_dir):
|
|
xbmcvfs.mkdirs(log_dir)
|
|
return log_dir
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _append_text_file(path: str, content: str) -> None:
|
|
try:
|
|
with open(path, "a", encoding="utf-8") as handle:
|
|
handle.write(content)
|
|
return
|
|
except Exception:
|
|
pass
|
|
if xbmcvfs is None:
|
|
return
|
|
try:
|
|
handle = xbmcvfs.File(path, "a")
|
|
handle.write(content)
|
|
handle.close()
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _rotate_log_file(path: str, *, max_bytes: int, max_files: int) -> None:
|
|
if max_bytes <= 0 or max_files <= 0:
|
|
return
|
|
try:
|
|
if not os.path.exists(path) or os.path.getsize(path) <= max_bytes:
|
|
return
|
|
except Exception:
|
|
return
|
|
try:
|
|
for index in range(max_files - 1, 0, -1):
|
|
older = f"{path}.{index}"
|
|
newer = f"{path}.{index + 1}"
|
|
if os.path.exists(older):
|
|
if index + 1 > max_files:
|
|
os.remove(older)
|
|
else:
|
|
os.replace(older, newer)
|
|
os.replace(path, f"{path}.1")
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _prune_dump_files(directory: str, *, prefix: str, max_files: int) -> None:
|
|
if not directory or max_files <= 0:
|
|
return
|
|
try:
|
|
entries = [
|
|
os.path.join(directory, name)
|
|
for name in os.listdir(directory)
|
|
if name.startswith(prefix) and name.endswith(".html")
|
|
]
|
|
if len(entries) <= max_files:
|
|
return
|
|
entries.sort(key=lambda path: os.path.getmtime(path))
|
|
for path in entries[: len(entries) - max_files]:
|
|
try:
|
|
os.remove(path)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def log_url(
|
|
addon_id: str,
|
|
*,
|
|
enabled_setting_id: str,
|
|
log_filename: str,
|
|
url: str,
|
|
kind: str = "VISIT",
|
|
request_id: Optional[str] = None,
|
|
plugin_setting_id: Optional[str] = None,
|
|
max_mb_setting_id: str = "log_max_mb",
|
|
max_files_setting_id: str = "log_max_files",
|
|
) -> None:
|
|
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
|
|
return
|
|
timestamp = datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
|
request_part = f"\t{request_id}" if request_id else ""
|
|
line = f"{timestamp}\t{kind}{request_part}\t{url}\n"
|
|
log_dir = _profile_logs_dir(addon_id)
|
|
path = os.path.join(log_dir, log_filename) if log_dir else os.path.join(os.path.dirname(__file__), log_filename)
|
|
max_mb = get_setting_int(addon_id, max_mb_setting_id, default=5)
|
|
max_files = get_setting_int(addon_id, max_files_setting_id, default=3)
|
|
_rotate_log_file(path, max_bytes=max_mb * 1024 * 1024, max_files=max_files)
|
|
_append_text_file(path, line)
|
|
|
|
|
|
def log_error(
|
|
addon_id: str,
|
|
*,
|
|
enabled_setting_id: str,
|
|
log_filename: str,
|
|
message: str,
|
|
request_id: Optional[str] = None,
|
|
plugin_setting_id: Optional[str] = None,
|
|
) -> None:
|
|
log_url(
|
|
addon_id,
|
|
enabled_setting_id=enabled_setting_id,
|
|
plugin_setting_id=plugin_setting_id,
|
|
log_filename=log_filename,
|
|
url=message,
|
|
kind="ERROR",
|
|
request_id=request_id,
|
|
)
|
|
|
|
|
|
def dump_response_html(
|
|
addon_id: str,
|
|
*,
|
|
enabled_setting_id: str,
|
|
url: str,
|
|
body: str,
|
|
filename_prefix: str,
|
|
request_id: Optional[str] = None,
|
|
plugin_setting_id: Optional[str] = None,
|
|
max_files_setting_id: str = "dump_max_files",
|
|
) -> None:
|
|
if not _is_logging_enabled(addon_id, global_setting_id=enabled_setting_id, plugin_setting_id=plugin_setting_id):
|
|
return
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
|
|
digest = hashlib.md5(url.encode("utf-8")).hexdigest() # nosec - filename only
|
|
filename = f"{filename_prefix}_{timestamp}_{digest}.html"
|
|
log_dir = _profile_logs_dir(addon_id)
|
|
path = os.path.join(log_dir, filename) if log_dir else os.path.join(os.path.dirname(__file__), filename)
|
|
request_line = f" request_id={request_id}" if request_id else ""
|
|
content = f"<!-- {url}{request_line} -->\n{body or ''}"
|
|
if log_dir:
|
|
max_files = get_setting_int(addon_id, max_files_setting_id, default=200)
|
|
_prune_dump_files(log_dir, prefix=filename_prefix, max_files=max_files)
|
|
_append_text_file(path, content)
|