Compare commits

..

5 Commits

11 changed files with 607 additions and 392 deletions

3
.gitignore vendored
View File

@@ -1,7 +1,5 @@
.env
__pycache__/
.agents/
.codex/
*.pyc
build/
dist/
@@ -9,4 +7,3 @@ dist/
releases/*/kx-bridge
releases/*/extract_credentials
releases/*/extract_credentials.exe
config/config.ini

View File

@@ -1,21 +1,28 @@
# Changelog
## [0.9.6.1] 2026-05-02
## [0.9.7] 2026-05-08
### Neu
- **fetch_credentials-Tool:** Ruft MQTT-Credentials direkt vom Drucker per HTTP ab — kein laufender Anycubic Slicer nötig, nur die Drucker-IP. Linux-Binary und Windows-EXE im Release enthalten. (Beitrag von bebu, PR #19)
### Fixes
- **Upload-Banner:** Banner wird nach Stopp/Abbruch nicht mehr erneut angezeigt — `file_ready` und Thumbnail werden jetzt gecleared wenn der Drucker `stoped` oder `canceled` meldet
- **Upload großer GCode-Dateien:** Dateien >1 MB wurden mit HTTP 413 abgelehnt — aiohttp `client_max_size` auf 256 MB erhöht
- **Upload-Timeout:** Socket-Timeout nach GCode-Upload von 10s auf 120s erhöht — große Dateien führten zu einem Absturz der Bridge mit leerer Antwort während der Drucker noch verarbeitete
---
## [0.9.6] 2026-05-02
### Neu
- **Fortschritts-Karte:** Verstrichen / Slicer-Schätzung / Restzeit als Mini-Cards (gleicher Stil wie Temperaturkarten)
- **Layer-Mini-Card:** Layerzahl als Mini-Card neben der Fortschrittsleiste
- **Licht-Status-Synchronisierung:** Ein/Aus-Zustand und Helligkeit des Druckerlichts werden jetzt live über `light/report` MQTT gelesen — der Licht-Toggle in der UI spiegelt den echten Druckerstatus wider
- **Zeit-Minicards:** Fortschritts-Panel zeigt jetzt drei Karten — Verstrichen, Restzeit und Slicer-Schätzung — sowie einen Layer-Badge neben dem Fortschrittsbalken
- **Slicer-Schätzzeit aus GCode:** Geschätzte Druckzeit wird direkt aus der hochgeladenen GCode-Datei gelesen (OrcaSlicer: `; total estimated time:` am Dateiende, PrusaSlicer: `; estimated printing time` im Header)
- **Erweiterte Druckerstatus-Strings:** `pausing`, `paused`, `resuming`, `resumed`, `stopping`, `stopped` hinzugefügt — fehlten bisher und ließen die UI rohe Status-Codes bei Pause/Fortsetzen/Stopp anzeigen
### Fixes
- **Slicer-Schätzzeit:** OrcaSlicer schreibt die geschätzte Zeit ans Ende der GCode-Datei — Bridge liest jetzt auch die letzten 64 KB (vorher nur die ersten 16 KB)
- **start.sh:** `config/`-Verzeichnis wird jetzt automatisch erstellt und `config.ini.example` wird beim ersten Start hineinkopiert (Issue #15)
- **file_ready-Banner:** Upload-Banner wird nach Stopp oder Abbruch eines Drucks nicht mehr angezeigt
- **Zeitanzeige bei Stopp/Abbruch:** Verstrichen-, Restzeit- und Slicer-Schätzung werden auf null zurückgesetzt wenn ein Druck gestoppt oder abgebrochen wird
- **start.sh:** `config/`-Verzeichnis und `config.ini.example` werden beim ersten Start automatisch angelegt wenn sie fehlen (Issue #15)
---

View File

@@ -1,21 +1,28 @@
# Changelog
## [0.9.6.1] 2026-05-02
## [0.9.7] 2026-05-08
### New
- **fetch_credentials tool:** Fetches and decrypts MQTT credentials directly from the printer via HTTP — no running Anycubic Slicer required, only the printer IP needed. Linux binary and Windows EXE included in release. (Contributed by bebu, PR #19)
### Fixes
- **Upload banner:** Banner is no longer shown again after print stop/cancel — `file_ready` and thumbnail are now cleared when the printer reports `stoped` or `canceled`
- **Large GCode upload:** Files >1 MB were rejected with HTTP 413 — aiohttp `client_max_size` raised to 256 MB
- **Upload timeout:** Socket timeout after GCode upload raised from 10s to 120s — large files caused the bridge to crash with an empty response while the printer was still processing
---
## [0.9.6] 2026-05-02
### New
- **Progress card:** Elapsed / Slicer estimate / Remaining time shown as mini-cards (same style as temperature cards)
- **Layer mini-card:** Layer count displayed as mini-card next to the progress bar
- **Light status sync:** Light on/off state and brightness are now read live from the printer via `light/report` MQTT message — the light toggle in the UI reflects the actual printer state
- **Time mini-cards:** Progress panel now shows three cards — Elapsed, Remaining and Slicer estimate — with a layer counter badge next to the progress bar
- **Slicer estimate from GCode:** Estimated print time is parsed directly from the uploaded GCode file (OrcaSlicer: `; total estimated time:` at end of file, PrusaSlicer: `; estimated printing time` in header)
- **Extended printer status strings:** Added `pausing`, `paused`, `resuming`, `resumed`, `stopping`, `stopped` states — previously missing, causing the UI to show raw status codes during pause/resume/stop transitions
### Fixes
- **Slicer estimate time:** OrcaSlicer writes the estimated time at the end of the GCode file — bridge now also scans the last 64 KB (previously only the first 16 KB were checked)
- **start.sh:** `config/` directory is now created automatically and `config.ini.example` is copied into it on first run (Issue #15)
- **file_ready banner:** Upload banner is no longer shown after print stop or cancel
- **Timers on stop/cancel:** Elapsed, remaining and slicer estimate times are reset to zero when a print is stopped or cancelled
- **start.sh:** `config/` directory and `config.ini.example` are now created automatically on first run if missing (Issue #15)
---

View File

@@ -2,7 +2,7 @@
# KX-Bridge Anycubic Kobra X
**Version:** 0.9.6.1
**Version:** 0.9.7
Steuere deinen **Anycubic Kobra X** mit OrcaSlicer — ohne Klipper, ohne Raspberry Pi.
KX-Bridge ist eine Moonraker-kompatible Bridge die direkt mit dem Drucker kommuniziert.
@@ -18,13 +18,23 @@ Den Kobra X in den LAN-Modus versetzen:
### Schritt 2 Credentials holen
Die MQTT-Zugangsdaten sind druckerspezifisch. So holst du sie:
Die MQTT-Zugangsdaten sind druckerspezifisch und an die Hardware gebunden.
**Option A fetch_credentials (empfohlen):**
```bash
fetch_credentials --ip 192.168.x.x --write-config
```
Holt die Credentials direkt per HTTP vom Drucker und schreibt sie automatisch in `config/config.ini`. Benötigt nur die Drucker-IP — kein Slicer nötig.
**Option B extract_credentials (wenn Drucker-IP unbekannt):**
1. **AnycubicSlicerNext** öffnen und Drucker verbinden (bis Status angezeigt wird)
2. **`extract_credentials.exe`** (Windows) oder **`extract_credentials`** (Linux) ausführen — gibt Username, Password, Device-ID und Drucker-IP aus
3. Werte merken / kopieren
2. **`extract_credentials`** ausführen — gibt Username, Password, Device-ID und Drucker-IP aus
3. Werte im Web-UI eintragen (⚙-Menü)
> **Download:** [gitea.it-drui.de/viewit/KX-Bridge-Release/releases](https://gitea.it-drui.de/viewit/KX-Bridge-Release/releases) → `extract_credentials.exe` (Windows) / `extract_credentials` (Linux) im jeweiligen Release-Asset
> **Download:** [gitea.it-drui.de/viewit/KX-Bridge-Release/releases](https://gitea.it-drui.de/viewit/KX-Bridge-Release/releases) → `fetch_credentials` / `extract_credentials` (Linux & Windows) im jeweiligen Release-Asset
### Schritt 3 Bridge starten
@@ -36,7 +46,7 @@ Das Skript baut das Docker-Image automatisch beim ersten Aufruf.
**Web-UI öffnen:** `http://BRIDGE-IP:7125`
→ Das ⚙-Menü öffnet sich beim ersten Start automatisch
→ Credentials aus Schritt 2 eintragen → **Speichern & Neustart**
Bei Option B: Credentials aus Schritt 2 eintragen → **Speichern & Neustart**
**OrcaSlicer verbinden:**
Drucker → Verbindungstyp **Moonraker** → Host: `http://BRIDGE-IP:7125`
@@ -116,7 +126,8 @@ docker-compose down
## Fehlerbehebung
**„Falsche MQTT-Zugangsdaten"** beim Start:
- AnycubicSlicerNext neu starten, Drucker verbinden, `extract_credentials` erneut ausführen
- `fetch_credentials --ip <Drucker-IP> --write-config` erneut ausführen und Bridge neu starten
- Wenn IP unbekannt: AnycubicSlicerNext neu starten, Drucker verbinden, `extract_credentials` erneut ausführen
- Nur die IP-Adresse ins Feld eintragen, keinen Port (✗ `192.168.1.102:9883` → ✓ `192.168.1.102`)
**Drucker nicht gefunden / kein LAN-Modus:**
@@ -141,3 +152,9 @@ sudo usermod -aG docker $USER # dann neu einloggen
## Lizenz & Rechtliches
Interoperabilitätsforschung gem. §69e UrhG — ausschließlich private, nicht-kommerzielle Nutzung.
<p align="center">
<a href="https://ko-fi.com/viewitde">
<img src="https://ko-fi.com/img/githubbutton_sm.svg" alt="Ko-fi Support"/>
</a>
</p>

View File

@@ -2,7 +2,7 @@
# KX-Bridge Anycubic Kobra X
**Version:** 0.9.6.1
**Version:** 0.9.7
Control your **Anycubic Kobra X** with OrcaSlicer — no Klipper, no Raspberry Pi.
KX-Bridge is a Moonraker-compatible bridge that communicates directly with the printer.
@@ -141,3 +141,9 @@ sudo usermod -aG docker $USER # then log out and back in
## License
Interoperability research under §69e UrhG — private, non-commercial use only.
<p align="center">
<a href="https://ko-fi.com/viewitde">
<img src="https://ko-fi.com/img/githubbutton_sm.svg" alt="Ko-fi Support"/>
</a>
</p>

View File

@@ -1 +1 @@
0.9.6.1
0.9.7

View File

@@ -2,7 +2,7 @@
# Bridge-Manager: start | stop | restart | status | log
SCRIPT="$(dirname "$0")/kobrax_moonraker_bridge.py"
LOGFILE="/tmp/bridge.log"
PRINTER_IP="${PRINTER_IP:-}"
PRINTER_IP="192.168.178.94"
case "${1:-restart}" in
start)
@@ -11,11 +11,7 @@ case "${1:-restart}" in
fuser -k 7125/tcp 2>/dev/null || pkill -f kobrax_moonraker_bridge 2>/dev/null
sleep 1
fi
CMD=(python3 "$SCRIPT")
if [[ -n "$PRINTER_IP" ]]; then
CMD+=(--printer-ip "$PRINTER_IP")
fi
nohup "${CMD[@]}" > "$LOGFILE" 2>&1 &
nohup python3 "$SCRIPT" --printer-ip "$PRINTER_IP" > "$LOGFILE" 2>&1 &
echo "Bridge gestartet PID=$!"
sleep 2; tail -3 "$LOGFILE"
;;

397
fetch_credentials.py Normal file
View File

@@ -0,0 +1,397 @@
#!/usr/bin/env python3
"""
fetch_credentials.py Fetches and decrypts Anycubic Kobra X credentials via HTTP API.
Original approach by bebu (PR #19, KX-Bridge-Release).
Reverse engineered from the Vue project embedded in libWorkbench.so (Anycubic Slicer Next).
No running slicer required — only the printer IP in LAN.
Algorithm: AES-256-CBC
Key: token[16:32] from /info response
IV: response token from /ctrl response
Usage:
python3 fetch_credentials.py --ip 192.168.x.x
python3 fetch_credentials.py --ip 192.168.x.x --write-config
python3 fetch_credentials.py --ip 192.168.x.x --write-config --config-file ../config/config.ini
python3 fetch_credentials.py --ctrl ctrl.json --info info.json
"""
import json
import sys
import base64
import hashlib
import argparse
import os
import time
import random
import string
import requests
from pathlib import Path
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
def evp_bytes_to_key(password, salt, key_len, iv_len):
"""
Derive key and IV from password and salt using OpenSSL EVP_BytesToKey
This mimics the CryptoJS default key derivation
"""
m = []
i = 0
while len(b''.join(m)) < (key_len + iv_len):
md5 = hashlib.md5()
data = password + salt
if i > 0:
data = m[i - 1] + password + salt
md5.update(data)
m.append(md5.digest())
i += 1
ms = b''.join(m)
return ms[:key_len], ms[key_len:key_len + iv_len]
def generate_signature(token, ts, nonce):
"""
Generate MD5 signature for /ctrl endpoint
Signature = md5(md5(token[0:16]) + ts + nonce)
"""
# First MD5: token.slice(0, 16)
first_md5 = hashlib.md5(token[:16].encode('utf-8')).hexdigest()
# Second MD5: first_md5 + ts + nonce
signature_data = first_md5 + str(ts) + nonce
signature = hashlib.md5(signature_data.encode('utf-8')).hexdigest()
return signature
def generate_nonce(length=6):
"""Generate a random alphanumeric nonce"""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
def fetch_from_http(ip, port, endpoint, token=None, did="random", verbose=False):
"""
Fetch data from HTTP endpoint on the printer
Args:
ip (str): IP address of the printer
port (int): Port number (default 18910)
endpoint (str): Either 'info' or 'ctrl'
token (str): Device token (required for /ctrl endpoint)
did (str): Device ID (required for /ctrl endpoint)
verbose (bool): Print debug information
Returns:
dict: JSON response data
"""
try:
if endpoint == 'info':
url = f"http://{ip}:{port}/info"
if verbose:
print(f"Fetching: {url}")
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
elif endpoint == 'ctrl':
if not token:
raise ValueError("Token is required for /ctrl endpoint")
# Generate signature parameters
ts = int(time.time() * 1000) # Current timestamp in ms
nonce = generate_nonce(6)
signature = generate_signature(token, ts, nonce)
url = f"http://{ip}:{port}/ctrl"
params = {
'ts': ts,
'nonce': nonce,
'sign': signature,
'did': did
}
if verbose:
print(f"Fetching: {url}")
print(f" Parameters:")
print(f" ts: {ts}")
print(f" nonce: {nonce}")
print(f" sign: {signature}")
print(f" did: {did}")
response = requests.post(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
else:
raise ValueError(f"Unknown endpoint: {endpoint}")
except requests.exceptions.RequestException as e:
raise Exception(f"HTTP request failed for {endpoint}: {e}")
except json.JSONDecodeError as e:
raise Exception(f"Invalid JSON response from {endpoint}: {e}")
def decrypt_text(encrypted_data, key, iv):
"""
Decrypt data using AES-256-CBC
Handles CryptoJS-style encrypted data (OpenSSL format with salt)
Args:
encrypted_data (str): Encrypted data string (CryptoJS format)
key (str): Decryption key string
iv (str): Initialization vector string
Returns:
dict: Decrypted JSON data
"""
try:
# Convert key and IV to bytes
key_bytes = key.encode('utf-8')
iv_bytes = iv.encode('utf-8')
# Decrypt using direct key and IV (as per the original code)
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
# The encrypted_data might be base64 or hex encoded
# Try base64 first
try:
encrypted_bytes = base64.b64decode(encrypted_data)
except:
try:
# Try as hex
encrypted_bytes = bytes.fromhex(encrypted_data)
except:
# If all else fails, encode as UTF-8
encrypted_bytes = encrypted_data.encode('utf-8')
# Decrypt
decrypted = cipher.decrypt(encrypted_bytes)
# Try to unpad
try:
unpadded = unpad(decrypted, AES.block_size)
except ValueError:
# If unpadding fails, use as-is
unpadded = decrypted
plaintext = unpadded.decode('utf-8')
# Parse JSON
return json.loads(plaintext)
except Exception as e:
return {"error": str(e), "error_type": type(e).__name__}
def main():
"""Main function to decrypt printer data"""
# Parse command-line arguments
parser = argparse.ArgumentParser(
description='Fetch and decrypt Anycubic Kobra X credentials via HTTP API',
)
# HTTP mode
parser.add_argument('--ip', help='IP address of the printer')
parser.add_argument('--port', type=int, default=18910, help='Printer HTTP port (default: 18910)')
# File mode
parser.add_argument('--ctrl', default='ctrl.json', help='Path to ctrl.json (default: ctrl.json)')
parser.add_argument('--info', default='info.json', help='Path to info.json (default: info.json)')
# Output
parser.add_argument('--output', default=None, help='Save raw decrypted JSON to file (optional)')
parser.add_argument('--write-config', action='store_true',
help='Write credentials to config.ini')
parser.add_argument('--config-file', default=None,
help='Path to config.ini (default: ../config/config.ini relative to this script)')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
args = parser.parse_args()
# Determine mode: HTTP or file
if args.ip:
# HTTP mode: fetch from printer
if args.verbose:
print("=" * 70)
print("Fetching configuration from printer via HTTP")
print("=" * 70)
print(f"Printer IP: {args.ip}:{args.port}")
print()
try:
# Fetch info.json
if args.verbose:
print("Step 1: Fetching device info...")
info = fetch_from_http(args.ip, args.port, 'info', verbose=args.verbose)
# Get token from info
token = info.get('token')
if not token:
print("Error: No token found in /info response", file=sys.stderr)
return 1
# Fetch data.json (encrypted config) from /ctrl endpoint
if args.verbose:
print("\nStep 2: Fetching encrypted configuration from /ctrl...")
data = fetch_from_http(args.ip, args.port, 'ctrl', token=token, verbose=args.verbose)
if args.verbose:
print("\nData fetched successfully!")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
else:
# File mode: load from disk
if args.verbose:
print("=" * 70)
print("Loading configuration from files")
print("=" * 70)
# Check if input files exist
if not Path(args.ctrl).exists():
print(f"Error: {args.ctrl} not found", file=sys.stderr)
return 1
if not Path(args.info).exists():
print(f"Error: {args.info} not found", file=sys.stderr)
return 1
# Read ctrl.json
try:
with open(args.ctrl, 'r') as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error reading {args.ctrl}: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error reading {args.ctrl}: {e}", file=sys.stderr)
return 1
# Read info.json
try:
with open(args.info, 'r') as f:
info = json.load(f)
except json.JSONDecodeError as e:
print(f"Error reading {args.info}: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error reading {args.info}: {e}", file=sys.stderr)
return 1
# Extract values
try:
encrypted_info = data['data']['info']
response_token = data['data']['token']
full_token = info['token']
except KeyError as e:
print(f"Error: Missing required key {e}", file=sys.stderr)
return 1
# Generate decryption key and IV
key_part = full_token[16:32]
if args.verbose:
print("=" * 70)
print("Printer Configuration Decryption")
print("=" * 70)
print(f"Input data file: {args.ctrl}")
print(f"Input info file: {args.info}")
print(f"Output file: {args.output}")
print()
print("Decryption Parameters:")
print(f" Encrypted data length: {len(encrypted_info)} bytes")
print(f" Full token: {full_token}")
print(f" Full token length: {len(full_token)} characters")
print(f" Response token (IV): {response_token}")
print(f" Decryption key: {key_part}")
print(f" Key length: {len(key_part)} characters")
print(f" IV length: {len(response_token)} characters")
print()
# Decrypt
if args.verbose:
print("Decrypting...")
result = decrypt_text(encrypted_info, key_part, response_token)
if 'error' in result:
print(f"Error during decryption: {result.get('error')}", file=sys.stderr)
return 1
# Show result
print()
print("=" * 55)
print(" CREDENTIALS")
print("=" * 55)
print(f" {'Printer IP':12s} {result.get('ip', 'n/a')}")
print(f" {'Username':12s} {result.get('username', 'n/a')}")
print(f" {'Password':12s} {result.get('password', 'n/a')}")
print(f" {'Device-ID':12s} {result.get('deviceId', 'n/a')}")
print(f" {'Mode-ID':12s} {result.get('modeId', 'n/a')}")
print(f" {'Model':12s} {result.get('modelName', 'n/a')}")
print(f" {'Broker':12s} {result.get('broker', 'n/a')}")
print("=" * 55)
if args.verbose:
print()
print("Full decrypted config:")
# Strip certs/keys from verbose output to avoid cluttering terminal
display = {k: v for k, v in result.items() if k not in ('devicecrt', 'devicepk')}
print(json.dumps(display, indent=2))
# Optionally save raw JSON
if args.output:
try:
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
print(f"\nRaw config saved to: {args.output}")
except Exception as e:
print(f"Error writing to {args.output}: {e}", file=sys.stderr)
return 1
# Write config.ini
if args.write_config:
if args.config_file:
config_path = args.config_file
else:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'..', 'config', 'config.ini')
config_path = os.path.normpath(config_path)
_write_config_ini(result, config_path)
else:
print(f"\nTip: pass --write-config to write credentials directly to config.ini")
return 0
def _write_config_ini(result: dict, config_path: str):
"""Write fetched credentials into config.ini, preserving existing non-credential keys."""
import configparser
cfg = configparser.ConfigParser()
if os.path.isfile(config_path):
cfg.read(config_path)
if not cfg.has_section('connection'):
cfg.add_section('connection')
cfg.set('connection', 'printer_ip', result.get('ip', ''))
cfg.set('connection', 'mqtt_port', '9883')
cfg.set('connection', 'username', result.get('username', ''))
cfg.set('connection', 'password', result.get('password', ''))
cfg.set('connection', 'device_id', result.get('deviceId', ''))
cfg.set('connection', 'mode_id', result.get('modeId', '20030'))
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, 'w') as f:
cfg.write(f)
print(f"\n✓ Credentials written to '{config_path}'.")
if __name__ == '__main__':
sys.exit(main())

View File

@@ -523,7 +523,7 @@ class KobraXClient:
sock = socket.create_connection((self.host, 18910), timeout=30)
sock.sendall(headers + body)
sock.settimeout(10)
sock.settimeout(120) # große GCode-Dateien brauchen Zeit bis der Drucker antwortet
response = b""
try:
while True:

View File

@@ -26,8 +26,6 @@ import sys
import tempfile
import time
import threading
import io
import zipfile
# Bei PyInstaller-Binary liegt alles neben sys.executable, sonst neben __file__
_BASE = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__))
@@ -130,139 +128,10 @@ def _parse_gcode_estimated_time(data: bytes) -> int:
elif unit == "m": secs += int(val) * 60
elif unit == "s": secs += int(val)
if secs:
log.info(f"Slicer estimate: {secs}s ({m.group(1).strip()})")
log.info(f"Slicer-Schätzzeit: {secs}s ({m.group(1).strip()})")
return secs
_FILAMENT_COLOR_KEYS = (
"filament_colour", "filament_color", "filament_colours", "filament_colors",
"extruder_colour", "extruder_color",
)
_FILAMENT_MATERIAL_KEYS = (
"filament_type", "filament_types", "filament_settings_id", "filament_preset",
)
_KNOWN_MATERIALS = (
"PLA-CF", "PETG-CF", "PA-CF", "PLA SILK", "PETG", "PLA", "ABS", "ASA",
"TPU", "PA", "PC", "HIPS", "PVA",
)
def _normalize_material(value) -> str:
text = str(value or "").upper().replace("_", " ").replace("-", "-").strip()
for material in _KNOWN_MATERIALS:
if re.search(rf"(^|[^A-Z0-9]){re.escape(material)}([^A-Z0-9]|$)", text):
return material
return text.split()[0] if text else "PLA"
def _color_to_rgb(value, default=None):
if default is None:
default = [255, 255, 255]
if isinstance(value, list) and len(value) >= 3:
try:
return [max(0, min(255, int(value[0]))),
max(0, min(255, int(value[1]))),
max(0, min(255, int(value[2])))]
except Exception:
return default
if isinstance(value, str):
m = re.search(r"#?([0-9a-fA-F]{6})(?:[0-9a-fA-F]{2})?", value)
if m:
raw = m.group(1)
return [int(raw[0:2], 16), int(raw[2:4], 16), int(raw[4:6], 16)]
return default
def _rgba(color) -> list[int]:
rgb = _color_to_rgb(color)
return [rgb[0], rgb[1], rgb[2], 255]
def _rgb_distance(a, b) -> int:
ar, ag, ab = _color_to_rgb(a)
br, bg, bb = _color_to_rgb(b)
return (ar - br) ** 2 + (ag - bg) ** 2 + (ab - bb) ** 2
def _parse_colors_from_text(text: str) -> list[list[int]]:
colors = []
for match in re.finditer(r"#?([0-9a-fA-F]{6})(?:[0-9a-fA-F]{2})?", text or ""):
raw = match.group(1)
colors.append([int(raw[0:2], 16), int(raw[2:4], 16), int(raw[4:6], 16)])
return colors
def _split_config_values(value: str) -> list[str]:
value = (value or "").strip().strip("[]")
parts = re.split(r"[;,]", value)
return [p.strip().strip("\"' ") for p in parts if p.strip().strip("\"' ")]
def _extract_config_value(line: str) -> str:
attr = re.search(r"value=[\"']([^\"']+)[\"']", line)
if attr:
return attr.group(1)
if "=" in line:
return line.split("=", 1)[1].strip()
if ":" in line:
return line.split(":", 1)[1].strip()
return line
def _parse_filament_metadata_text(text: str) -> dict:
colors: list[list[int]] = []
materials: list[str] = []
for line in (text or "").splitlines():
low = line.lower()
if any(k in low for k in _FILAMENT_COLOR_KEYS):
value = _extract_config_value(line)
for color in _parse_colors_from_text(value):
if color not in colors:
colors.append(color)
if any(k in low for k in _FILAMENT_MATERIAL_KEYS):
value = _extract_config_value(line)
for part in _split_config_values(value):
mat = _normalize_material(part)
if mat and mat not in materials:
materials.append(mat)
return {"colors": colors, "materials": materials}
def _merge_filament_metadata(target: dict, source: dict):
for key in ("colors", "materials"):
for value in source.get(key, []):
if value not in target[key]:
target[key].append(value)
def _parse_uploaded_filament_metadata(data: bytes, filename: str) -> dict:
"""Best-effort extraction of slicer filament colors/materials from G-code or 3MF."""
meta = {"colors": [], "materials": [], "source": ""}
suffix = pathlib.Path(filename or "").suffix.lower()
if suffix == ".3mf" or data[:4] == b"PK\x03\x04":
try:
with zipfile.ZipFile(io.BytesIO(data)) as zf:
for name in zf.namelist():
lname = name.lower()
if not lname.endswith((".config", ".json", ".model", ".xml", ".gcode")):
continue
if zf.getinfo(name).file_size > 2_000_000:
continue
text = zf.read(name).decode("utf-8", errors="ignore")
before = len(meta["colors"]) + len(meta["materials"])
_merge_filament_metadata(meta, _parse_filament_metadata_text(text))
if len(meta["colors"]) + len(meta["materials"]) > before and not meta["source"]:
meta["source"] = name
except Exception as e:
log.warning(f"3MF metadata could not be parsed: {e}")
else:
search = (data[:131072] + data[-262144:]).decode("utf-8", errors="ignore")
_merge_filament_metadata(meta, _parse_filament_metadata_text(search))
if meta["colors"] or meta["materials"]:
meta["source"] = "gcode"
return meta
class KobraXBridge:
def __init__(self, client: KobraXClient, args=None):
self.client = client
@@ -298,7 +167,6 @@ class KobraXBridge:
self._ams_slots: list[dict] = []
self._ams_loaded_slot: int = -1
self._last_uploaded_file: str = ""
self._uploaded_filament_metadata: dict[str, dict] = {}
self._serve_dir = tempfile.TemporaryDirectory(prefix="kobrax_serve_")
self._serve_dir_path: str = self._serve_dir.name
@@ -310,6 +178,7 @@ class KobraXBridge:
client.callbacks["info/report"] = self._on_info
client.callbacks["file/report"] = self._on_file
client.callbacks["multiColorBox/report"] = self._on_multicolor_box
client.callbacks["light/report"] = self._on_light
# -------------------------------------------------------------------------
# MQTT callbacks (called from reader thread)
@@ -388,7 +257,7 @@ class KobraXBridge:
thumb = details.get("thumbnail") or details.get("png_image") or ""
if thumb:
self._thumbnail_b64 = thumb
log.info(f"Thumbnail received: {len(thumb)} base64 chars")
log.info(f"Vorschaubild empfangen: {len(thumb)} Zeichen base64")
self._push_status_update()
def _on_multicolor_box(self, payload: dict):
@@ -418,9 +287,15 @@ class KobraXBridge:
threading.Thread(target=_tip_form, daemon=True).start()
if slots:
self._ams_slots = slots
log.info(f"AMS slots received: {len(slots)}, loaded_slot={self._ams_loaded_slot}")
log.info(f"AMS-Slots empfangen: {len(slots)}, loaded_slot={self._ams_loaded_slot}")
self._push_status_update()
def _on_light(self, payload: dict):
d = payload.get("data") or {}
self._state["light_on"] = bool(d.get("status", 0))
self._state["light_brightness"] = int(d.get("brightness", 80))
self._push_status_update()
# OrcaSlicer filament preset IDs (MoonrakerPrinterAgent.cpp mapping)
_TRAY_INFO_IDX = {
"PLA": "OGFL99", "PLA-CF": "OGFL98", "PLA SILK": "OGFL96",
@@ -431,7 +306,7 @@ class KobraXBridge:
}
def _build_lane_data(self) -> dict:
"""Build BBL-AMS JSON for OrcaSlicer DevFilaSystemParser::ParseV1_0."""
"""Baut BBL-AMS-JSON für OrcaSlicer DevFilaSystemParser::ParseV1_0."""
slots = self._ams_slots
total = len(slots)
if total == 0:
@@ -462,7 +337,7 @@ class KobraXBridge:
color_hex = color_raw[:6].upper() + "FF"
else:
color_hex = "FFFFFFFF"
material = _normalize_material(slot.get("type", "PLA"))
material = slot.get("type", "PLA").upper()
tray_info_idx = self._TRAY_INFO_IDX.get(material, "OGFL99")
tray_array.append({
"id": str(slot_id),
@@ -489,118 +364,6 @@ class KobraXBridge:
"tray_exist_bits": format(tray_exist_bits, "X"),
}
def _uploaded_metadata_for(self, filename: str) -> dict:
if not filename:
return {"colors": [], "materials": [], "source": ""}
return (self._uploaded_filament_metadata.get(filename)
or self._uploaded_filament_metadata.get(os.path.basename(filename))
or {"colors": [], "materials": [], "source": ""})
def _loaded_ams_slots(self) -> list[tuple[int, dict]]:
return [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5]
def _filtered_loaded_ams_slots(self) -> list[tuple[int, dict]]:
loaded = self._loaded_ams_slots()
default_slot = getattr(self._args, "default_ams_slot", "auto")
if default_slot == "auto":
return loaded
try:
slot_idx = int(default_slot)
except ValueError:
return loaded
selected = [(i, s) for i, s in loaded if i == slot_idx]
if selected:
return selected
log.warning(f"Default slot {slot_idx} is empty - falling back to Auto")
return loaded
def _build_ams_assignments(self, filename: str) -> list[dict]:
loaded = self._filtered_loaded_ams_slots()
if not loaded:
return []
metadata = self._uploaded_metadata_for(filename)
colors = metadata.get("colors") or []
materials = metadata.get("materials") or []
target_count = max(len(colors), len(materials))
# Without slicer metadata, preserve the previous behavior: advertise all
# occupied slots and let the printer/firmware use its normal fallback.
if target_count == 0:
return [{
"paint_index": i,
"slot_index": i,
"paint_color": _rgba(slot.get("color", [255, 255, 255])),
"ams_color": _rgba(slot.get("color", [255, 255, 255])),
"material_type": _normalize_material(slot.get("type", "PLA")),
"reason": "loaded",
} for i, slot in loaded]
assignments = []
used_slots: set[int] = set()
for paint_index in range(target_count):
target_color = colors[paint_index] if paint_index < len(colors) else None
target_material = (_normalize_material(materials[paint_index])
if paint_index < len(materials) else "")
candidates = loaded
if target_material:
material_matches = [
(i, s) for i, s in candidates
if _normalize_material(s.get("type", "PLA")) == target_material
]
if material_matches:
candidates = material_matches
unused = [(i, s) for i, s in candidates if i not in used_slots]
if unused:
candidates = unused
def _score(item):
slot_index, slot = item
score = 0
if target_color is not None:
score += _rgb_distance(target_color, slot.get("color", [255, 255, 255]))
if target_material and _normalize_material(slot.get("type", "PLA")) != target_material:
score += 200000
return score, slot_index
slot_index, slot = min(candidates, key=_score)
used_slots.add(slot_index)
assignments.append({
"paint_index": paint_index,
"slot_index": slot_index,
"paint_color": _rgba(target_color or slot.get("color", [255, 255, 255])),
"ams_color": _rgba(slot.get("color", [255, 255, 255])),
"material_type": _normalize_material(slot.get("type", target_material or "PLA")),
"target_material": target_material,
"reason": "metadata",
})
summary = [
f"T{a['paint_index']}→S{a['slot_index']} {a['material_type']}"
for a in assignments
]
log.info(f"AMS metadata mapping for {filename}: {', '.join(summary)}")
return assignments
def _build_anycubic_ams_mapping(self, filename: str) -> list[dict]:
return [{
"paint_index": a["paint_index"],
"ams_index": a["slot_index"],
"paint_color": a["paint_color"],
"ams_color": a["ams_color"],
"material_type": a["material_type"],
} for a in self._build_ams_assignments(filename)]
def _build_simple_ams_mapping(self, filename: str) -> list[dict]:
return [{
"slot_index": a["slot_index"],
"material_type": a["material_type"],
"color": a["ams_color"][:3],
"paint_index": a["paint_index"],
"paint_color": a["paint_color"][:3],
} for a in self._build_ams_assignments(filename)]
# -------------------------------------------------------------------------
# WebSocket push
# -------------------------------------------------------------------------
@@ -789,16 +552,6 @@ class KobraXBridge:
# Slicer-Zeitschätzung aus GCode-Header auslesen
self._state["slicer_time"] = _parse_gcode_estimated_time(file_data)
filament_meta = _parse_uploaded_filament_metadata(file_data, remote_filename)
self._uploaded_filament_metadata[remote_filename] = filament_meta
self._uploaded_filament_metadata[os.path.basename(remote_filename)] = filament_meta
if filament_meta.get("colors") or filament_meta.get("materials"):
log.info(
"Upload filament metadata: "
f"colors={filament_meta.get('colors', [])} "
f"materials={filament_meta.get('materials', [])} "
f"source={filament_meta.get('source', '')}"
)
# Datei auf Disk ablegen (temp-Verzeichnis) damit Drucker sie per HTTP abrufen kann
safe_name = os.path.basename(remote_filename) # keine Pfad-Traversal
@@ -808,7 +561,7 @@ class KobraXBridge:
del file_data # RAM freigeben
self._last_uploaded_file = remote_filename
log.info(f"Upload: {remote_filename} ({file_size} bytes) md5={file_md5} -> printer")
log.info(f"Upload: {remote_filename} ({file_size} bytes) md5={file_md5} → Drucker")
# Datei per HTTP auf den Drucker hochladen (serve_path liegt bereits auf Disk)
upload_url = self._state.get("upload_url") or None
@@ -818,10 +571,10 @@ class KobraXBridge:
None, self.client.upload_gcode, serve_path, remote_filename, upload_url
)
except Exception as e:
log.error(f"Upload failed: {e}")
log.error(f"Upload fehlgeschlagen: {e}")
return web.json_response({"error": str(e)}, status=500)
log.info(f"Upload succeeded: {result}")
log.info(f"Upload erfolgreich: {result}")
# Druck starten mit vollständigem Payload (inkl. serve-URL + md5 + size)
serve_url = f"http://{request.host}/serve/{remote_filename}"
@@ -841,7 +594,7 @@ class KobraXBridge:
loop = asyncio.get_event_loop()
loop.run_in_executor(None, lambda: self._start_print(remote_filename, serve_url, file_md5, file_size))
else:
log.info(f"Uploaded only (print=false): {remote_filename}")
log.info(f"Nur hochgeladen (print=false): {remote_filename}")
self._state["file_ready"] = remote_filename
# OctoPrint-kompatibler Response (OrcaSlicer wertet refs aus)
@@ -866,12 +619,31 @@ class KobraXBridge:
def _start_print(self, filename: str, url: str = "", md5: str = "", filesize: int = 0):
self._state["file_ready"] = ""
ams_box_mapping = self._build_anycubic_ams_mapping(filename)
use_ams = len(ams_box_mapping) > 0
log.info(
f"AMS-Slots: {len(self._loaded_ams_slots())}/{len(self._ams_slots)} belegt "
f"{[m['ams_index'] for m in ams_box_mapping]}"
)
default_slot = getattr(self._args, "default_ams_slot", "auto")
all_loaded = [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5]
if default_slot != "auto":
try:
slot_idx = int(default_slot)
loaded = [(i, s) for i, s in all_loaded if i == slot_idx]
if not loaded:
log.warning(f"Standard-Slot {slot_idx} ist leer fallback auf Auto")
loaded = all_loaded
except ValueError:
loaded = all_loaded
else:
loaded = all_loaded
use_ams = len(loaded) > 0
ams_box_mapping = [
{
"paint_index": i,
"ams_index": i,
"paint_color": [255, 255, 255, 255],
"ams_color": [255, 255, 255, 255],
"material_type": s.get("type", "PLA"),
}
for i, s in loaded
]
log.info(f"AMS-Slots: {len(loaded)}/{len(self._ams_slots)} belegt → {[i for i,_ in loaded]}")
auto_leveling = getattr(self._args, "auto_leveling", 1)
payload = {
"taskid": "-1",
@@ -900,9 +672,9 @@ class KobraXBridge:
log.info(f"print/start → {filename} url={url} ams={len(self._ams_slots)} slots")
result = self.client.publish("print", "start", payload, timeout=15.0)
if result:
log.info(f"Print start confirmed: state={result.get('state')}")
log.info(f"Druckstart bestätigt: state={result.get('state')}")
else:
log.warning("Print start: no response from printer")
log.warning("Druckstart: keine Antwort vom Drucker")
async def handle_print_start(self, request):
try:
@@ -915,9 +687,38 @@ class KobraXBridge:
if not filename:
return web.json_response({"error": "no filename"}, status=400)
log.info(f"Starting print: {filename}")
log.info(f"Druck starten: {filename}")
# AMS-Mapping aus gecachtem State — leere Slots (status != 5) überspringen
default_slot = getattr(self._args, "default_ams_slot", "auto")
ams_box_mapping = []
for i, slot in enumerate(self._ams_slots):
if slot.get("status") != 5:
log.info(f"AMS-Slot {i} leer (status={slot.get('status')}) übersprungen")
continue
if default_slot != "auto":
try:
if i != int(default_slot):
continue
except ValueError:
pass
ams_box_mapping.append({
"slot_index": i,
"material_type": slot.get("type", "PLA"),
"color": slot.get("color", [255, 255, 255]),
})
# Fallback auf alle belegten Slots wenn gewählter Slot leer war
if default_slot != "auto" and not ams_box_mapping:
log.warning(f"Standard-Slot {default_slot} leer fallback auf alle belegten Slots")
for i, slot in enumerate(self._ams_slots):
if slot.get("status") != 5:
continue
ams_box_mapping.append({
"slot_index": i,
"material_type": slot.get("type", "PLA"),
"color": slot.get("color", [255, 255, 255]),
})
ams_box_mapping = self._build_simple_ams_mapping(filename)
use_ams = len(ams_box_mapping) > 0
payload = {
@@ -933,7 +734,7 @@ class KobraXBridge:
None, lambda: self.client.publish("print", "start", payload, timeout=15.0)
)
if result is None:
return web.json_response({"error": "No response from printer"}, status=504)
return web.json_response({"error": "Keine Antwort vom Drucker"}, status=504)
return web.json_response({"result": "ok"})
@@ -1564,7 +1365,7 @@ nav.bottom-nav{display:none;position:fixed;bottom:0;left:0;right:0;
<span class="slider-val" id="d-fan-val">0</span>
</div>
<div style="margin-top:12px;display:flex;gap:8px;flex-wrap:wrap">
<button class="btn btn-sm" style="background:var(--raised);color:var(--txt)" onclick="quickFan(0)"><span class="lbl-off">Aus</span></button>
<button class="btn btn-sm" style="background:var(--raised);color:var(--txt)" onclick="quickFan(0)">Aus</button>
<button class="btn btn-sm" style="background:var(--raised);color:var(--txt)" onclick="quickFan(25)">25%</button>
<button class="btn btn-sm" style="background:var(--raised);color:var(--txt)" onclick="quickFan(50)">50%</button>
<button class="btn btn-sm" style="background:var(--raised);color:var(--txt)" onclick="quickFan(75)">75%</button>
@@ -1601,23 +1402,23 @@ nav.bottom-nav{display:none;position:fixed;bottom:0;left:0;right:0;
<div class="card-title" style="display:flex;justify-content:space-between;align-items:center">
<span><span>≡</span> <span id="ptitle-console">Ereignis-Log</span></span>
<a id="btn-log-dl" href="/api/log/download" download="kx-bridge.log"
style="font-size:12px;padding:4px 10px;background:var(--raised);border-radius:6px;color:var(--txt2);text-decoration:none">⬇ <span id="lbl-log-download">Download</span></a>
style="font-size:12px;padding:4px 10px;background:var(--raised);border-radius:6px;color:var(--txt2);text-decoration:none">⬇ Download</a>
</div>
<div style="display:flex;gap:6px;margin-bottom:6px;flex-wrap:wrap;align-items:center">
<input id="log-filter" type="text" placeholder="Filter…"
oninput="renderLog()"
style="flex:1;min-width:120px;padding:5px 10px;background:var(--raised);border:1px solid var(--border);border-radius:6px;color:var(--txt);font-size:12px;font-family:var(--mono)">
<button id="btn-autoscroll" onclick="toggleAutoScroll()"
style="font-size:12px;padding:5px 10px;border-radius:6px;border:1px solid var(--border);background:var(--accent);color:#fff;cursor:pointer;white-space:nowrap">⬇ <span id="lbl-log-auto">Auto</span></button>
style="font-size:12px;padding:5px 10px;border-radius:6px;border:1px solid var(--border);background:var(--accent);color:#fff;cursor:pointer;white-space:nowrap">⬇ Auto</button>
<button onclick="consoleLogs=[];renderLog()"
style="font-size:12px;padding:5px 10px;border-radius:6px;border:1px solid var(--border);background:var(--raised);color:var(--txt2);cursor:pointer">✕ <span id="lbl-log-clear">Clear</span></button>
style="font-size:12px;padding:5px 10px;border-radius:6px;border:1px solid var(--border);background:var(--raised);color:var(--txt2);cursor:pointer">✕ Clear</button>
</div>
<div style="display:flex;gap:5px;margin-bottom:8px;flex-wrap:wrap">
<span id="lbl-log-dir" style="font-size:11px;color:var(--txt2);align-self:center;margin-right:2px">Dir:</span>
<span style="font-size:11px;color:var(--txt2);align-self:center;margin-right:2px">Dir:</span>
<button class="log-dir-btn active" id="logdir-all" onclick="setLogDir('all')" style="font-size:11px;padding:3px 9px;border-radius:5px;border:1px solid var(--border);cursor:pointer"></button>
<button class="log-dir-btn" id="logdir-rx" onclick="setLogDir('rx')" style="font-size:11px;padding:3px 9px;border-radius:5px;border:1px solid var(--border);cursor:pointer">RX</button>
<button class="log-dir-btn" id="logdir-tx" onclick="setLogDir('tx')" style="font-size:11px;padding:3px 9px;border-radius:5px;border:1px solid var(--border);cursor:pointer">TX</button>
<span id="lbl-log-topic" style="font-size:11px;color:var(--txt2);align-self:center;margin-left:6px;margin-right:2px">Topic:</span>
<span style="font-size:11px;color:var(--txt2);align-self:center;margin-left:6px;margin-right:2px">Topic:</span>
<button class="log-topic-btn" data-topic="multiColorBox" onclick="setLogTopic('multiColorBox')" style="font-size:11px;padding:3px 9px;border-radius:5px;border:1px solid var(--border);cursor:pointer">AMS</button>
<button class="log-topic-btn" data-topic="print" onclick="setLogTopic('print')" style="font-size:11px;padding:3px 9px;border-radius:5px;border:1px solid var(--border);cursor:pointer">print</button>
<button class="log-topic-btn" data-topic="info" onclick="setLogTopic('info')" style="font-size:11px;padding:3px 9px;border-radius:5px;border:1px solid var(--border);cursor:pointer">info</button>
@@ -1675,20 +1476,16 @@ var LANG_DE={
confirm_cancel:'Druck wirklich abbrechen?',
settings_title:'Einstellungen',settings_connection:'Verbindung',settings_print:'Druckeinstellungen',settings_poll:'Poll-Intervall',settings_version:'Version',
settings_save:'Speichern & Neustart',settings_printer_ip:'Drucker-IP',settings_mqtt_port:'MQTT-Port',
settings_username:'MQTT-Benutzername',settings_password:'MQTT-Passwort',settings_device_id:'Device-ID',settings_mode_id:'Mode-ID',settings_device_placeholder:'32 Hex-Zeichen',hint_ip_no_port:'Nur IP-Adresse, kein Port (z.B. 192.168.1.102)',
settings_username:'MQTT-Benutzername',settings_password:'MQTT-Passwort',settings_device_id:'Device-ID',settings_mode_id:'Mode-ID',hint_ip_no_port:'Nur IP-Adresse, kein Port (z.B. 192.168.1.102)',
settings_default_slot:'Standard-Slot (Einfarbdruck)',settings_slot_auto:'Auto (alle belegten Slots)',settings_auto_leveling:'Auto-Leveling vor Druck',
update_check:'Auf Updates prüfen',update_checking:'Prüfe...',update_available:'verfügbar',update_none:'Bereits aktuell',
update_apply:'Jetzt installieren',update_applying:'Lade herunter...',update_restarting:'Starte neu...',update_error:'Fehler',
btn_connect:'⚡ Verbinden',btn_disconnect:'✕ Trennen',
lbl_conn_error:'Verbindungsfehler:',
settings_button_title:'Einstellungen',
slot_edit_title:'Slot bearbeiten',slot_edit_color:'Farbe',slot_edit_material:'Material',
slot_edit_save:'💾 Speichern',slot_edit_custom:'z.B. PLA, PETG, ABS…',
slot_edit_ok:'AMS Slot',
ams_slot_select:'Slot auswählen',
log_dir_all:'Alle',log_download:'Download',log_auto:'Auto',log_clear:'Leeren',log_dir:'Richtung:',log_topic:'Topic:',
log_settings_error:'Settings-Fehler:',log_axis_error:'Achsen-Fehler:',log_home_error:'Home-Fehler:',log_motors_error:'Motoren-Fehler:',log_temp_error:'Temp-Fehler:',log_light_error:'Licht-Fehler:',log_speed_error:'Speed-Fehler:',log_fan_error:'Lüfter-Fehler:',log_ams_error:'AMS-Fehler:',log_stream_unavailable:'Stream nicht verfügbar',
print_action_pause:'Pause',print_action_resume:'Fortsetzen',print_action_cancel:'Abbrechen',
log_dir_all:'Alle',
file_ready_btn:'▶ Druck starten',
file_cancel_btn:'✕ Abbrechen'
};
@@ -1713,20 +1510,16 @@ var LANG_EN={
confirm_cancel:'Really cancel the print?',
settings_title:'Settings',settings_connection:'Connection',settings_print:'Print Settings',settings_poll:'Poll Interval',settings_version:'Version',
settings_save:'Save & Restart',settings_printer_ip:'Printer IP',settings_mqtt_port:'MQTT Port',
settings_username:'MQTT Username',settings_password:'MQTT Password',settings_device_id:'Device ID',settings_mode_id:'Mode ID',settings_device_placeholder:'32 hex characters',hint_ip_no_port:'IP address only, no port (e.g. 192.168.1.102)',
settings_username:'MQTT Username',settings_password:'MQTT Password',settings_device_id:'Device ID',settings_mode_id:'Mode ID',hint_ip_no_port:'IP address only, no port (e.g. 192.168.1.102)',
settings_default_slot:'Default Slot (single color)',settings_slot_auto:'Auto (all loaded slots)',settings_auto_leveling:'Auto-Leveling before print',
update_check:'Check for Updates',update_checking:'Checking...',update_available:'available',update_none:'Already up to date',
update_apply:'Install Now',update_applying:'Downloading...',update_restarting:'Restarting...',update_error:'Error',
btn_connect:'⚡ Connect',btn_disconnect:'✕ Disconnect',
lbl_conn_error:'Connection error:',
settings_button_title:'Settings',
slot_edit_title:'Edit Slot',slot_edit_color:'Color',slot_edit_material:'Material',
slot_edit_save:'💾 Save',slot_edit_custom:'e.g. PLA, PETG, ABS…',
slot_edit_ok:'AMS Slot',
ams_slot_select:'Select slot',
log_dir_all:'All',log_download:'Download',log_auto:'Auto',log_clear:'Clear',log_dir:'Dir:',log_topic:'Topic:',
log_settings_error:'Settings error:',log_axis_error:'Axis error:',log_home_error:'Home error:',log_motors_error:'Motor error:',log_temp_error:'Temperature error:',log_light_error:'Light error:',log_speed_error:'Speed error:',log_fan_error:'Fan error:',log_ams_error:'AMS error:',log_stream_unavailable:'Stream unavailable',
print_action_pause:'Pause',print_action_resume:'Resume',print_action_cancel:'Cancel',
log_dir_all:'All',
file_ready_btn:'▶ Start Print',
file_cancel_btn:'✕ Cancel'
};
@@ -1794,14 +1587,12 @@ function applyLang(){
setText('lbl-password',T.settings_password);
setText('lbl-device-id',T.settings_device_id);
setText('lbl-mode-id',T.settings_mode_id);
var did=document.getElementById('s-device-id');if(did)did.setAttribute('placeholder',T.settings_device_placeholder);
setText('lbl-default-slot',T.settings_default_slot);
setText('opt-slot-auto',T.settings_slot_auto);
setText('lbl-auto-leveling',T.settings_auto_leveling);
setText('lbl-update-check',T.update_check);
setText('lbl-update-apply',T.update_apply);
var sb=document.getElementById('settings-btn');if(sb)sb.setAttribute('title',T.settings_button_title);
// Speed buttons
setText('d-spd-lbl-1',T.speed_silent.replace(/^\S+\s/,''));
setText('d-spd-lbl-2',T.speed_normal.replace(/^\S+\s/,''));
@@ -1809,8 +1600,6 @@ function applyLang(){
// AMS feed/unload
document.querySelectorAll('.lbl-feed').forEach(e=>e.textContent=T.lbl_feed);
document.querySelectorAll('.lbl-unload').forEach(e=>e.textContent=T.lbl_unload);
setText('ams-no-data',T.ams_no_data);
setText('ams-slot-lbl',T.ams_slot_select);
// conn-btn text (nur wenn nicht im Übergangszustand)
updateConnBtn();
// Slot-Edit-Dialog
@@ -1819,11 +1608,6 @@ function applyLang(){
setText('btn-slot-edit-save',T.slot_edit_save);
var mi=document.getElementById('slot-edit-mat');if(mi)mi.setAttribute('placeholder',T.slot_edit_custom);
setText('logdir-all',T.log_dir_all);
setText('lbl-log-download',T.log_download);
setText('lbl-log-auto',T.log_auto);
setText('lbl-log-clear',T.log_clear);
setText('lbl-log-dir',T.log_dir);
setText('lbl-log-topic',T.log_topic);
setText('file-ready-btn',T.file_ready_btn);
setText('file-cancel-btn',T.file_cancel_btn);
}
@@ -1862,7 +1646,7 @@ var logTopicFilter=''; // '' = no topic filter
function clog(msg,cls){
cls=cls||'msg-info';
var ts=new Date().toLocaleTimeString(currentLang==='de'?'de':'en',{hour:'2-digit',minute:'2-digit',second:'2-digit'});
var ts=new Date().toLocaleTimeString('de',{hour:'2-digit',minute:'2-digit',second:'2-digit'});
_appendLog({ts:ts,lvl:'',name:'ui',msg:msg},cls);
}
function _lvlCls(lvl){
@@ -2074,10 +1858,10 @@ function updateConnBtn(){
var offline=S.kobra_state==='offline';
if(offline){
btn.className='conn-btn disconnected';
btn.textContent=T.btn_connect||'Connect';
btn.textContent=T.btn_connect||'Verbinden';
} else {
btn.className='conn-btn connected';
btn.textContent=T.btn_disconnect||'Disconnect';
btn.textContent=T.btn_disconnect||'Trennen';
}
}
@@ -2219,7 +2003,7 @@ function saveSlotEdit(){
closeSlotEdit();
clog((T.slot_edit_ok||'AMS Slot')+' '+(_slotEditIndex+1)+': '+mat+' '+hex,'msg-ok');
})
.catch(function(e){clog((T.log_error||'Error:')+' '+e,'msg-err');});
.catch(function(e){clog('Fehler: '+e,'msg-err');});
}
document.addEventListener('DOMContentLoaded',function(){
document.getElementById('s-printer-ip').addEventListener('input',function(){
@@ -2258,7 +2042,7 @@ function saveSettings(){
},4000);
}).catch(function(e){
btn.disabled=false;setText('btn-save-settings',T.settings_save);
clog((T.log_settings_error||'Settings error:')+' '+e,'msg-err');
clog('Settings-Fehler: '+e,'msg-err');
});
}
function checkUpdate(){
@@ -2305,7 +2089,7 @@ async function poll(){
Object.assign(S,d);
applyState();
updateHistory();
}catch(e){clog((T.log_poll_error||'Poll error:')+' '+e,'msg-err')}
}catch(e){clog('Poll-Fehler: '+e,'msg-err')}
}
var pollTimer;
(function(){
@@ -2315,10 +2099,10 @@ var pollTimer;
// ── Print actions ──
function printAction(a){
post('/printer/print/'+a,{}).then(function(){clog((T.nav_print||'Print')+': '+(T['print_action_'+a]||a),'msg-ok');poll()})
.catch(function(e){clog((T.log_error||'Error:')+' '+e,'msg-err')});
post('/printer/print/'+a,{}).then(function(){clog('Druck: '+a,'msg-ok');poll()})
.catch(function(e){clog('Fehler: '+e,'msg-err')});
}
function confirmCancel(){if(confirm(T.confirm_cancel||'Really cancel the print?'))printAction('cancel')}
function confirmCancel(){if(confirm('Druck wirklich abbrechen?'))printAction('cancel')}
// ── Axis motion ──
// axis codes: 0=X, 1=Y, 2=Z
@@ -2334,28 +2118,28 @@ function move(axis,dir,dist){
// axis: 0=X,1=Y,2=Z → printer axis codes: 1=X,2=Y,3=Z
var axisMap={0:1,1:2,2:3};
post('/api/axis',{axis:axisMap[axis],move_type:1,distance:dir*dist})
.then(function(){clog((T.log_axis||'Axis')+' '+(axis===0?'X':axis===1?'Y':'Z')+' '+(dir>0?'+':'')+dir*dist+'mm','msg-ok')})
.catch(function(e){clog((T.log_axis_error||'Axis error:')+' '+e,'msg-err')});
.then(function(){clog('Achse '+(axis===0?'X':axis===1?'Y':'Z')+' '+(dir>0?'+':'')+dir*dist+'mm','msg-ok')})
.catch(function(e){clog('Achse-Fehler: '+e,'msg-err')});
}
function homeAll(){
post('/api/axis',{axis:5,move_type:2,distance:0})
.then(function(){clog('Home All','msg-ok')})
.catch(function(e){clog((T.log_home_error||'Home error:')+' '+e,'msg-err')});
.catch(function(e){clog('Home-Fehler: '+e,'msg-err')});
}
function homeXY(){
post('/api/axis',{axis:4,move_type:2,distance:0})
.then(function(){clog('Home XY','msg-ok')})
.catch(function(e){clog((T.log_home_error||'Home error:')+' '+e,'msg-err')});
.catch(function(e){clog('Home-Fehler: '+e,'msg-err')});
}
function homeZ(){
post('/api/axis',{axis:3,move_type:2,distance:0})
.then(function(){clog('Home Z','msg-ok')})
.catch(function(e){clog((T.log_home_error||'Home error:')+' '+e,'msg-err')});
.catch(function(e){clog('Home-Fehler: '+e,'msg-err')});
}
function disableMotors(){
post('/api/axis',{action:'turnOff'})
.then(function(){clog('Motors Off','msg-ok')})
.catch(function(e){clog((T.log_motors_error||'Motor error:')+' '+e,'msg-err')});
.catch(function(e){clog('Motors-Fehler: '+e,'msg-err')});
}
// ── Temperature ──
@@ -2363,21 +2147,21 @@ function setNozzle(){
var v=parseFloat(document.getElementById('p-nozzle-inp').value||0);
post('/api/temperature',{nozzle:v,bed:S.bed_target})
.then(function(){clog('Nozzle → '+v+'°C','msg-ok')})
.catch(function(e){clog((T.log_temp_error||'Temperature error:')+' '+e,'msg-err')});
.catch(function(e){clog('Temp-Fehler: '+e,'msg-err')});
}
function setBed(){
var v=parseFloat(document.getElementById('p-bed-inp').value||0);
post('/api/temperature',{nozzle:S.nozzle_target,bed:v})
.then(function(){clog(T.label_bed+''+v+'°C','msg-ok')})
.catch(function(e){clog((T.log_temp_error||'Temperature error:')+' '+e,'msg-err')});
.catch(function(e){clog('Temp-Fehler: '+e,'msg-err')});
}
// ── Light ──
function setLight(){
var on=document.getElementById('d-light-toggle').checked;
post('/api/light',{on:on,brightness:80})
.then(function(){clog(on?(T.log_light_on||'Light on'):(T.log_light_off||'Light off'),'msg-ok')})
.catch(function(e){clog((T.log_light_error||'Light error:')+' '+e,'msg-err')});
.then(function(){clog('Licht '+(on?'an, '+br+'%':'aus'),'msg-ok')})
.catch(function(e){clog('Licht-Fehler: '+e,'msg-err')});
}
// ── Print Speed ──
@@ -2388,7 +2172,7 @@ function setSpeed(mode){
if(b) b.classList.toggle('spd-active',m===mode);
});
post('/api/speed',{mode:mode})
.catch(function(e){clog((T.log_speed_error||'Speed error:')+' '+e,'msg-err')});
.catch(function(e){clog('Speed-Fehler: '+e,'msg-err')});
}
// ── Fan ──
@@ -2396,15 +2180,15 @@ function setFan(){
var v=parseInt(document.getElementById('d-fan').value);
document.getElementById('d-fan-val').textContent=v;
post('/api/fan',{speed:v})
.then(function(){clog((T.log_fan||'Fan →')+' '+v+'%','msg-ok')})
.catch(function(e){clog((T.log_fan_error||'Fan error:')+' '+e,'msg-err')});
.then(function(){clog('Lüfter → '+v+'%','msg-ok')})
.catch(function(e){clog('Lüfter-Fehler: '+e,'msg-err')});
}
function quickFan(v){
document.getElementById('d-fan').value=v;
document.getElementById('d-fan-val').textContent=v;
post('/api/fan',{speed:v})
.then(function(){clog((T.log_fan||'Fan →')+' '+v+'%','msg-ok')})
.catch(function(e){clog((T.log_fan_error||'Fan error:')+' '+e,'msg-err')});
.then(function(){clog('Lüfter → '+v+'%','msg-ok')})
.catch(function(e){clog('Lüfter-Fehler: '+e,'msg-err')});
}
// ── AMS ──
@@ -2412,7 +2196,7 @@ function amsFeed(type){
var slot=parseInt(document.getElementById('ams-slot-sel').value);
post('/api/ams/feed',{slot_index:slot,type:type})
.then(function(){clog((type===1?T.lbl_feed:T.lbl_unload)+' Slot '+(slot+1),'msg-ok')})
.catch(function(e){clog((T.log_ams_error||'AMS error:')+' '+e,'msg-err')});
.catch(function(e){clog('AMS-Fehler: '+e,'msg-err')});
}
// ── Camera ──
@@ -2429,13 +2213,13 @@ function camStart(){
img.style.display='none';
ph.style.display='flex';
camOn=false;
document.getElementById('cam-toggle-btn').textContent=T.btn_cam_start||'Camera';
clog((T.log_error||'Error:')+' '+(T.log_stream_unavailable||'Stream unavailable'),'msg-err');
document.getElementById('cam-toggle-btn').textContent=T.btn_cam_start||'Kamera';
clog((T.log_error||'Fehler:')+' Stream nicht verfügbar','msg-err');
};
img.src='/api/camera/stream?t='+Date.now();
camOn=true;
document.getElementById('cam-toggle-btn').textContent=T.btn_cam_stop||'Camera';
clog((T.log_cam_start||'Camera started'),'msg-ok');
document.getElementById('cam-toggle-btn').textContent=T.btn_cam_stop||'Kamera';
clog((T.log_cam_start||'Kamera gestartet'),'msg-ok');
// MJPEG liefert kein onload Spinner nach kurzem Timeout ausblenden
setTimeout(function(){
sp.style.display='none';
@@ -2444,7 +2228,7 @@ function camStart(){
}).catch(function(e){
sp.style.display='none';
ph.style.display='flex';
clog((T.log_error||'Error:')+' '+e,'msg-err');
clog((T.log_error||'Fehler:')+' '+e,'msg-err');
});
}
function camStop(){
@@ -2455,9 +2239,9 @@ function camStop(){
document.getElementById('cam-spinner').style.display='none';
document.getElementById('cam-placeholder').style.display='flex';
camOn=false;
document.getElementById('cam-toggle-btn').textContent=T.btn_cam_start||'Camera';
clog(T.log_cam_stop||'Camera stopped','msg-ok');
}).catch(function(e){clog((T.log_error||'Error:')+' '+e,'msg-err')});
document.getElementById('cam-toggle-btn').textContent=T.btn_cam_start||'Kamera';
clog(T.log_cam_stop||'Kamera gestoppt','msg-ok');
}).catch(function(e){clog((T.log_error||'Fehler:')+' '+e,'msg-err')});
}
function toggleCam(){if(camOn)camStop();else camStart()}
</script>
@@ -2520,7 +2304,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
pass
self._state["print_state"] = "error"
self._state["kobra_state"] = "offline"
log.info("Disconnected manually")
log.info("Manuell getrennt")
return web.json_response({"result": "disconnected"})
async def handle_api_speed(self, request):
@@ -2655,7 +2439,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
"video", "startCapture", None, timeout=8.0
))
state = (result or {}).get("state", "")
log.info(f"Camera startCapture: state={state}")
log.info(f"Kamera startCapture: state={state}")
return web.json_response({"result": "ok", "state": state})
async def handle_api_camera_stop(self, request):
@@ -2669,7 +2453,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
"""Einzelner JPEG-Frame aus dem Kamera-Stream für Obico und andere Snapshot-Clients."""
url = self._state.get("camera_url", "")
if not url:
return web.Response(status=503, text="No camera URL known")
return web.Response(status=503, text="Keine Kamera-URL bekannt")
is_rtsp = url.lower().startswith("rtsp://")
input_args = ["-fflags", "nobuffer", "-flags", "low_delay"]
if is_rtsp:
@@ -2691,7 +2475,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
except Exception as e:
return web.Response(status=503, text=str(e))
if not jpeg:
return web.Response(status=503, text="No frame received")
return web.Response(status=503, text="Kein Frame empfangen")
return web.Response(body=jpeg, content_type="image/jpeg",
headers={"Cache-Control": "no-cache"})
@@ -2699,7 +2483,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
"""MJPEG proxy: FLV → MJPEG via ffmpeg, served as multipart/x-mixed-replace."""
url = self._state.get("camera_url", "")
if not url:
return web.Response(status=503, text="No camera URL known")
return web.Response(status=503, text="Keine Kamera-URL bekannt")
is_rtsp = url.lower().startswith("rtsp://")
ffmpeg_input_args = [
@@ -2728,10 +2512,10 @@ function toggleCam(){if(camOn)camStop();else camStart()}
stderr=asyncio.subprocess.DEVNULL,
)
except (FileNotFoundError, OSError) as e:
log.warning("Camera: ffmpeg not found - camera stream unavailable")
log.warning("Kamera: ffmpeg nicht gefunden Kamerastream nicht verfügbar")
return web.Response(status=503, text="ffmpeg not found")
except Exception as e:
log.warning(f"Camera: ffmpeg could not be started: {e}")
log.warning(f"Kamera: ffmpeg konnte nicht gestartet werden: {e}")
return web.Response(status=503, text=str(e))
boundary = "kobraxframe"
@@ -2771,7 +2555,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
except Exception:
return resp
except Exception as e:
log.warning(f"Camera stream interrupted: {e}")
log.warning(f"Kamera-Stream unterbrochen: {e}")
finally:
try:
proc.kill()
@@ -2787,7 +2571,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
if not os.path.isfile(serve_path):
return web.Response(status=404, text="not found")
size = os.path.getsize(serve_path)
log.info(f"Printer downloading file: {filename} ({size} bytes)")
log.info(f"Drucker lädt Datei ab: {filename} ({size} bytes)")
return web.FileResponse(serve_path, headers={
"Content-Disposition": f'attachment; filename="{filename}"'
})
@@ -2820,7 +2604,6 @@ function toggleCam(){if(camOn)camStop();else camStart()}
"thumbnail": self._thumbnail_b64,
"connection_error": s["connection_error"],
"file_ready": s["file_ready"],
"uploaded_filaments": self._uploaded_metadata_for(s["file_ready"] or self._last_uploaded_file),
"version": self._read_version(),
})
@@ -2832,7 +2615,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
if namespace == "lane_data":
await asyncio.get_event_loop().run_in_executor(None, self._get_ams_slots_fresh)
lanes = self._build_lane_data()
log.info(f"AMS sync: {len(lanes)} lanes sent to OrcaSlicer")
log.info(f"AMS-Sync: {len(lanes)} Lanes an OrcaSlicer")
return web.json_response({
"result": {
"namespace": "lane_data",
@@ -2930,7 +2713,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
return response
def _restart_bridge(self):
log.info("Restarting bridge ...")
log.info("Bridge wird neu gestartet …")
exe = sys.executable
# PyInstaller frozen binary: sys.argv[0] == sys.executable → nicht doppelt übergeben
if getattr(sys, "frozen", False):
@@ -3021,12 +2804,12 @@ function toggleCam(){if(camOn)camStop();else camStart()}
return web.json_response({"error": f"Gitea HTTP {resp.status}"}, status=502)
releases = await resp.json(content_type=None)
if not releases:
return web.json_response({"error": "No releases found"}, status=404)
return web.json_response({"error": "Keine Releases gefunden"}, status=404)
# Dev: neuestes Release mit "-dev+" im Tag suchen
if is_dev:
dev_releases = [r for r in releases if "-dev+" in r.get("tag_name", "")]
if not dev_releases:
return web.json_response({"error": "No dev releases found"}, status=404)
return web.json_response({"error": "Keine Dev-Releases gefunden"}, status=404)
data = dev_releases[0]
else:
data = releases[0]
@@ -3119,7 +2902,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
break
self.ws_clients.discard(ws)
log.info(f"WS client disconnected ({len(self.ws_clients)} remaining)")
log.info(f"WS client getrennt ({len(self.ws_clients)} verbleibend)")
return ws
async def _handle_ws_rpc(self, ws: web.WebSocketResponse, raw: str):
@@ -3179,8 +2962,11 @@ function toggleCam(){if(camOn)camStop();else camStart()}
elif method == "printer.print.start":
filename = params.get("filename", self._last_uploaded_file)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self._start_print(filename))
result = "ok"
resp = await loop.run_in_executor(
None, lambda: self.client.publish("print", "start",
{"filename": filename, "use_ams": False}, timeout=15.0)
)
result = "ok" if resp else "timeout"
elif method == "printer.print.pause":
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.pause_print)
@@ -3201,7 +2987,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
log.debug(f"Unbekannte RPC-Methode: {method}")
result = {}
except Exception as e:
log.error(f"RPC error for {method}: {e}")
log.error(f"RPC-Fehler für {method}: {e}")
error = {"code": -32603, "message": str(e)}
if rpc_id is not None:
@@ -3235,7 +3021,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
# ── Offline-Modus: warten bis Drucker wieder erreichbar ──────────
if _offline:
if self._printer_reachable():
log.info("Printer reachable - connecting MQTT ...")
log.info("Drucker erreichbar stelle MQTT-Verbindung her …")
try:
self.client.connect()
_offline = False
@@ -3246,7 +3032,7 @@ function toggleCam(){if(camOn)camStop();else camStart()}
except Exception as e:
err = _mqtt_error_msg(e)
self._state["connection_error"] = err
log.warning(f"Connection attempt failed: {err}")
log.warning(f"Verbindungsaufbau fehlgeschlagen: {err}")
stop_event.wait(_probe_interval)
continue
else:
@@ -3271,10 +3057,10 @@ function toggleCam(){if(camOn)camStop();else camStart()}
if slots:
self._ams_slots = slots
except Exception as e:
log.warning(f"Poll error: {e}")
log.warning(f"Poll-Fehler: {e}")
# Prüfen ob Drucker wirklich weg ist
if not self._printer_reachable():
log.info("Printer unreachable - switching to offline mode")
log.info("Drucker nicht erreichbar wechsle in Offline-Modus")
self._state["print_state"] = "error"
self._state["kobra_state"] = "offline"
self._state["connection_error"] = f"Printer unreachable ({self._args.printer_ip})"
@@ -3298,7 +3084,7 @@ def _mqtt_error_msg(exc: Exception) -> str:
def build_app(bridge: KobraXBridge) -> web.Application:
app = web.Application()
app = web.Application(client_max_size=256 * 1024 * 1024) # 256 MB für große GCode-Dateien
r = app.router
# Moonraker API
@@ -3378,13 +3164,13 @@ async def run_bridge(args):
# Verbindungsversuch beim Start bei Fehler im Offline-Modus weiterlaufen
loop = asyncio.get_event_loop()
log.info(f"Connecting to printer {args.printer_ip}:{args.mqtt_port} ...")
log.info(f"Verbinde mit Drucker {args.printer_ip}:{args.mqtt_port} ")
try:
await loop.run_in_executor(None, client.connect)
log.info("MQTT verbunden")
except Exception as e:
err = _mqtt_error_msg(e)
log.warning(f"Connection failed: {err} - starting in offline mode")
log.warning(f"Verbindung fehlgeschlagen: {err} starte im Offline-Modus")
bridge._state["print_state"] = "error"
bridge._state["kobra_state"] = "offline"
bridge._state["connection_error"] = err
@@ -3408,7 +3194,7 @@ async def run_bridge(args):
_local_ip = _s.getsockname()[0]
except Exception:
_local_ip = args.host
log.info(f"Bridge running at http://{_local_ip}:{args.port}")
log.info(f"Bridge läuft auf http://{_local_ip}:{args.port}")
log.info(f"OrcaSlicer → Klipper → Host: {_local_ip} Port: {args.port}")
log.info("Ctrl-C zum Beenden")
@@ -3422,7 +3208,7 @@ async def run_bridge(args):
stop_event.set()
await runner.cleanup()
client.disconnect()
log.info("Bridge stopped")
log.info("Bridge beendet")
def main():

View File

@@ -1,2 +1,4 @@
aiohttp>=3.9
imageio-ffmpeg>=0.4.9
requests>=2.30.0
pycryptodome>=3.20.0