commit 9bfd455a7ca69562efdf4628292adc3cdf5d95ba Author: viewit Date: Mon Apr 20 16:07:43 2026 +0200 feat: KX-Bridge 0.9.0-beta1 – initiales Release-Repo diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6bfe60b --- /dev/null +++ b/.env.example @@ -0,0 +1,23 @@ +# KX-Bridge – Verbindungsparameter +# Kopiere diese Datei nach .env und trage deine Werte ein: +# cp .env.example .env +# .env wird NICHT ins Repository committed. +# +# Credentials mit extract_credentials.exe (Windows) oder +# extract_credentials (Linux) aus dem laufenden AnycubicSlicerNext auslesen. + +# IP-Adresse des Druckers im lokalen Netzwerk +PRINTER_IP=192.168.x.x + +# MQTT-Port (Anycubic Kobra X Standard: 9883) +MQTT_PORT=9883 + +# MQTT-Zugangsdaten (druckerspezifisch, beginnt mit "user") +MQTT_USERNAME=userXXXXXXXXXX +MQTT_PASSWORD=XXXXXXXXXXXXXXX + +# Geräte-ID (32-stelliger Hex-String, druckerspezifisch) +DEVICE_ID=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +# Modell-ID (Kobra X Standard: 20030) +MODE_ID=20030 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4ceca80 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +__pycache__/ +*.pyc +build/ +dist/ +*.spec diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1400d2e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY 05_scripts/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY 05_scripts/kobrax_moonraker_bridge.py . +COPY 05_scripts/env_loader.py . +COPY 05_scripts/kobrax_client.py . + +EXPOSE 7125 + +ENTRYPOINT ["python", "kobrax_moonraker_bridge.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..b4624d0 --- /dev/null +++ b/README.md @@ -0,0 +1,187 @@ +# KX-Bridge + +Verbindet den Anycubic Kobra X mit OrcaSlicer – ohne Klipper, ohne Raspberry Pi. + +KX-Bridge läuft auf deinem PC oder NAS und stellt eine Schnittstelle bereit, über die OrcaSlicer den Drucker direkt steuern kann: Druckstart, Temperatur, Fortschritt, AMS-Farbwechsel. + +--- + +## Enthaltene Dateien + +| Datei | Beschreibung | +|-------|-------------| +| `kobrax_moonraker_bridge.py` | Bridge-Hauptprogramm | +| `extract_credentials.exe` | Zugangsdaten aus AnycubicSlicerNext auslesen (Windows) | +| `extract_credentials` | Zugangsdaten aus AnycubicSlicerNext auslesen (Linux) | +| `kobra_x_orcaslicer_preset.zip` | OrcaSlicer-Druckerprofil für den Kobra X | +| `bridge.sh` | Service-Manager für Linux | +| `Dockerfile` / `docker-compose.yml` | Docker-Deployment | +| `.env.example` | Konfigurationsvorlage | + +--- + +## Voraussetzungen + +- Anycubic Kobra X im LAN-Modus (Drucker muss über LAN erreichbar sein, nicht nur über Anycubic-Cloud) +- PC, NAS oder Server im gleichen Netzwerk (Windows oder Linux) +- Docker oder Python 3.9+ +- MQTT-Zugangsdaten des Druckers → [Schritt 1](#schritt-1-zugangsdaten-ermitteln) + +--- + +## Schnellstart + +### Schritt 1: Zugangsdaten ermitteln + +Die Bridge benötigt druckerspezifische MQTT-Zugangsdaten. + +> Wichtig: Der Drucker muss sich im LAN-Modus befinden. Nur wenn der Drucker direkt über LAN (nicht ausschließlich über die Anycubic-Cloud) erreichbar ist, können die Zugangsdaten ermittelt und die Bridge genutzt werden. + +Die Zugangsdaten werden mit `extract_credentials` aus dem laufenden AnycubicSlicerNext ausgelesen. + +Vorbereitung: AnycubicSlicerNext starten und mit dem Drucker verbinden (bis der Drucker-Status angezeigt wird). + +Windows: +``` +extract_credentials.exe --write-env +``` + +Linux: +```bash +chmod +x extract_credentials +./extract_credentials --write-env +``` + +Die Zugangsdaten werden automatisch in `.env` gespeichert. + +> Falls das Ergebnis unsicher wirkt: `--verbose` zeigt alle gefundenen Kandidaten. Den richtigen Wert manuell in `.env` eintragen. + +--- + +### Schritt 2: Konfiguration prüfen + +```bash +cp .env.example .env +# .env öffnen und Werte kontrollieren +``` + +--- + +### Schritt 3: Bridge starten + +Option A – Docker (empfohlen): +```bash +docker compose up -d +``` +Läuft im Hintergrund, startet automatisch nach Systemneustart. + +Option B – Linux Binary: +```bash +chmod +x kx-bridge +./kx-bridge +# Oder mit Service-Manager: +./bridge.sh start +``` + +Option C – Python direkt: +```bash +pip install aiohttp +python kobrax_moonraker_bridge.py +``` + +--- + +### Schritt 4: OrcaSlicer-Profil installieren + +1. `kobra_x_orcaslicer_preset.zip` in OrcaSlicer importieren: + Datei → Konfigurationen importieren → ZIP auswählen +2. Anycubic Kobra X als Drucker auswählen + +--- + +### Schritt 5: OrcaSlicer verbinden + +1. Drucker-Einstellungen öffnen +2. Verbindungstyp: Moonraker +3. Adresse: `http://IP-DES-BRIDGE-PC:7125` eintragen +4. Auf "Test" klicken – bei erfolgreicher Verbindung erscheint eine Bestätigungsmeldung + +--- + +## bridge.sh – Service-Manager (Linux) + +```bash +./bridge.sh start # Im Hintergrund starten +./bridge.sh stop # Beenden +./bridge.sh restart # Neustarten +./bridge.sh status # Status anzeigen +./bridge.sh log 50 # Letzte 50 Log-Zeilen +``` + +--- + +## Docker – Nützliche Befehle + +```bash +docker compose up -d # Starten +docker compose down # Stoppen +docker compose logs -f # Logs verfolgen +docker compose pull && docker compose up -d # Update +``` + +--- + +## Fehlerbehebung + +Port 7125 belegt: +```bash +./bridge.sh stop +./bridge.sh start +``` + +Verbindungstest in OrcaSlicer schlägt fehl: +- Firewall prüfen: Port 7125 muss erreichbar sein +- Bridge-Log prüfen: `./bridge.sh log` oder `docker compose logs` +- Drucker-IP in `.env` korrekt? + +Zugangsdaten werden abgelehnt: +- AnycubicSlicerNext starten, mit Drucker verbinden +- `extract_credentials --verbose` ausführen und alle Kandidaten prüfen +- Richtigen Wert manuell in `.env` eintragen, Bridge neu starten + +Docker: Permission denied: +```bash +sudo usermod -aG docker $USER +# Neu einloggen, dann erneut versuchen +``` + +--- + +## Konfigurationsreferenz (.env) + +| Parameter | Beschreibung | Beispiel | +|-----------|-------------|---------| +| `PRINTER_IP` | IP-Adresse des Druckers | `192.168.1.100` | +| `MQTT_PORT` | MQTT-Port (nicht ändern) | `9883` | +| `MQTT_USERNAME` | Benutzername (beginnt mit „user") | `userXXXXXXXXXX` | +| `MQTT_PASSWORD` | Passwort (~15 Zeichen) | `***` | +| `DEVICE_ID` | Geräte-ID (32 Hex-Zeichen) | `xxxxxxxx...` | +| `MODE_ID` | Modell-ID (Kobra X Standard) | `20030` | + +--- + +## Sicherheitshinweise + +- Die Bridge bindet auf Port 7125 – nur im lokalen Netzwerk betreiben +- `.env` enthält Drucker-Zugangsdaten – nicht teilen oder ins Repo committen +- Alle Zugangsdaten werden ausschließlich lokal verarbeitet + +--- + +## Hinweis zur Nutzung + +Dieses Projekt dient der privaten Nutzung und der Herstellung von Interoperabilität zwischen dem Anycubic Kobra X und freier Software (OrcaSlicer). + +`extract_credentials` liest ausschließlich den Arbeitsspeicher des auf deinem eigenen PC laufenden AnycubicSlicerNext-Prozesses. Es werden keine Daten übertragen oder gespeichert, außer in die lokale `.env`-Datei. Das Tool funktioniert nur für den Prozess des Druckers, dem du selbst gehörst. + +Das Projekt steht in keiner Verbindung zu Anycubic und wird nicht kommerziell betrieben. diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..8238e26 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.9.0-beta1 diff --git a/bridge.sh b/bridge.sh new file mode 100755 index 0000000..0a9c2ca --- /dev/null +++ b/bridge.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# Bridge-Manager: start | stop | restart | status | log +SCRIPT="$(dirname "$0")/kobrax_moonraker_bridge.py" +LOGFILE="/tmp/bridge.log" +PRINTER_IP="192.168.178.94" + +case "${1:-restart}" in + start) + if ss -tlnp | grep -q 7125; then + echo "Port 7125 belegt – beende alte Instanz..." + fuser -k 7125/tcp 2>/dev/null || pkill -f kobrax_moonraker_bridge 2>/dev/null + sleep 1 + fi + nohup python3 "$SCRIPT" --printer-ip "$PRINTER_IP" > "$LOGFILE" 2>&1 & + echo "Bridge gestartet PID=$!" + sleep 2; tail -3 "$LOGFILE" + ;; + stop) + pkill -f kobrax_moonraker_bridge && echo "Bridge beendet" || echo "Nicht aktiv" + fuser -k 7125/tcp 2>/dev/null + ;; + restart) + "$0" stop + sleep 1 + "$0" start + ;; + status) + if ss -tlnp | grep -q 7125; then + echo "Bridge läuft (Port 7125 aktiv)" + ps aux | grep kobrax_moonraker | grep -v grep + else + echo "Bridge nicht aktiv" + fi + ;; + log) + tail -${2:-20} "$LOGFILE" + ;; + *) + echo "Usage: $0 {start|stop|restart|status|log [N]}" + exit 1 + ;; +esac diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2a9f35c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +services: + kx-bridge: + image: kx-bridge:latest + build: . + env_file: .env + ports: + - "7125:7125" + restart: unless-stopped + logging: + driver: json-file + options: + max-size: "10m" + max-file: "3" diff --git a/env_loader.py b/env_loader.py new file mode 100644 index 0000000..304d4a6 --- /dev/null +++ b/env_loader.py @@ -0,0 +1,46 @@ +""" +env_loader.py – lädt Verbindungsparameter aus .env (Repo-Root oder Arbeitsverzeichnis). +Umgebungsvariablen haben Vorrang vor .env-Werten. +""" +import os +import pathlib + + +def _find_env_file() -> pathlib.Path | None: + # Suche .env im selben Verzeichnis, dann im Parent (Repo-Root) + for base in (pathlib.Path(__file__).parent, pathlib.Path(__file__).parent.parent): + p = base / ".env" + if p.is_file(): + return p + return None + + +def _load_env_file(path: pathlib.Path): + with open(path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + key = key.strip() + val = val.strip() + if key and key not in os.environ: + os.environ[key] = val + + +_env_path = _find_env_file() +if _env_path: + _load_env_file(_env_path) + + +def get(key: str, default: str = "") -> str: + return os.environ.get(key, default) + + +# Häufig verwendete Shortcuts +PRINTER_IP = get("PRINTER_IP", "") +MQTT_PORT = int(get("MQTT_PORT", "9883")) +USERNAME = get("MQTT_USERNAME", "") +PASSWORD = get("MQTT_PASSWORD", "") +MODE_ID = get("MODE_ID", "") +DEVICE_ID = get("DEVICE_ID", "") diff --git a/extract_credentials.py b/extract_credentials.py new file mode 100644 index 0000000..5c47ed2 --- /dev/null +++ b/extract_credentials.py @@ -0,0 +1,454 @@ +""" +extract_credentials.py – Extrahiert Anycubic LAN-MQTT-Credentials aus dem RAM +des laufenden AnycubicSlicerNext-Prozesses. + +Voraussetzungen: + - AnycubicSlicerNext läuft und ist mit dem Drucker verbunden + - Gleiches Benutzerkonto wie der Slicer-Prozess (kein Admin nötig) + +Verwendung: + python3 extract_credentials.py [--write-env] [--env-file ../.env] + +Funktionsweise: + 1. Prozess "AnycubicSlicer.exe" (Windows) bzw. "AnycubicSlicer" (Linux) finden + 2. Speicherseiten des Prozesses durchsuchen (nur r/rw, keine Exec-Pages) + 3. Nach MQTT-Credential-Patterns suchen: + Username: user[A-Za-z0-9]{8,12} + Password: [A-Za-z0-9+/]{13,18} + Drucker-IP: d{1,3}.d{1,3}.d{1,3}.d{1,3} + 4. Kandidaten nach Plausibilität filtern und ausgeben + 5. Optional: .env-Datei schreiben +""" + +import argparse +import os +import re +import struct +import sys +import platform + +# --------------------------------------------------------------------------- +# Plattform-Erkennung +# --------------------------------------------------------------------------- + +IS_WINDOWS = platform.system() == "Windows" +IS_LINUX = platform.system() == "Linux" + +# --------------------------------------------------------------------------- +# Pattern +# --------------------------------------------------------------------------- + +# Username: "user" + 8–12 alphanumerische Zeichen (drucker-generiert) +RE_USERNAME = re.compile(rb'user[A-Za-z0-9]{8,12}(?=[^A-Za-z0-9]|$)') + +# Password: 13–20 alphanumerische Zeichen (kein / da kein RTSP-Pfad) +# Anycubic-Passwörter: gemischte Groß/Klein/Ziffern, kein Slash +RE_PASSWORD = re.compile(rb'[A-Za-z0-9]{13,20}(?=[^A-Za-z0-9]|$)') + +# Kontext-Pattern: sucht Passwort das direkt nach "password" im Speicher steht +RE_PASSWORD_CTX = re.compile(rb'(?:password|passwd|Password)\x00{0,4}([A-Za-z0-9]{10,25})(?=[^A-Za-z0-9]|$)', re.IGNORECASE) + +# Proximity-Pattern: Username gefolgt von Passwort in naher Umgebung (<512 Bytes) +RE_USER_PASS_PROXIMITY = re.compile( + rb'(user[A-Za-z0-9]{8,12}).{1,512}?([A-Za-z0-9]{13,20})(?=[^A-Za-z0-9]|$)', + re.DOTALL +) + +# IPv4-Adresse (kein localhost, kein Broadcast) +RE_IP = re.compile(rb'(? "int | None": + """Findet die PID eines Prozesses anhand des Namens (case-insensitive).""" + import ctypes + import ctypes.wintypes + + TH32CS_SNAPPROCESS = 0x00000002 + + class PROCESSENTRY32(ctypes.Structure): + _fields_ = [ + ("dwSize", ctypes.wintypes.DWORD), + ("cntUsage", ctypes.wintypes.DWORD), + ("th32ProcessID", ctypes.wintypes.DWORD), + ("th32DefaultHeapID", ctypes.POINTER(ctypes.c_ulong)), + ("th32ModuleID", ctypes.wintypes.DWORD), + ("cntThreads", ctypes.wintypes.DWORD), + ("th32ParentProcessID", ctypes.wintypes.DWORD), + ("pcPriClassBase", ctypes.c_long), + ("dwFlags", ctypes.wintypes.DWORD), + ("szExeFile", ctypes.c_char * 260), + ] + + snap = ctypes.windll.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) + if snap == ctypes.wintypes.HANDLE(-1).value: + return None + + entry = PROCESSENTRY32() + entry.dwSize = ctypes.sizeof(PROCESSENTRY32) + + try: + if not ctypes.windll.kernel32.Process32First(snap, ctypes.byref(entry)): + return None + while True: + if entry.szExeFile.decode("utf-8", errors="replace").lower() == name.lower(): + return entry.th32ProcessID + if not ctypes.windll.kernel32.Process32Next(snap, ctypes.byref(entry)): + break + finally: + ctypes.windll.kernel32.CloseHandle(snap) + return None + + +def _win_read_memory(pid: int, chunk_size: int = 0x10000) -> "list[bytes]": + """Liest alle lesbaren Speicherseiten eines Windows-Prozesses.""" + import ctypes + import ctypes.wintypes + + PROCESS_VM_READ = 0x0010 + PROCESS_QUERY_INFORMATION = 0x0400 + MEM_COMMIT = 0x1000 + PAGE_NOACCESS = 0x01 + PAGE_GUARD = 0x100 + + class MEMORY_BASIC_INFORMATION(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_void_p), + ("AllocationBase", ctypes.c_void_p), + ("AllocationProtect", ctypes.wintypes.DWORD), + ("RegionSize", ctypes.c_size_t), + ("State", ctypes.wintypes.DWORD), + ("Protect", ctypes.wintypes.DWORD), + ("Type", ctypes.wintypes.DWORD), + ] + + k32 = ctypes.windll.kernel32 + handle = k32.OpenProcess(PROCESS_VM_READ | PROCESS_QUERY_INFORMATION, False, pid) + if not handle: + raise PermissionError(f"OpenProcess fehlgeschlagen (PID {pid}): {ctypes.GetLastError()}") + + chunks = [] + addr = 0 + mbi = MEMORY_BASIC_INFORMATION() + + try: + while k32.VirtualQueryEx(handle, ctypes.c_void_p(addr), + ctypes.byref(mbi), ctypes.sizeof(mbi)): + skip = ( + mbi.State != MEM_COMMIT or + mbi.Protect & PAGE_NOACCESS or + mbi.Protect & PAGE_GUARD or + mbi.RegionSize > 256 * 1024 * 1024 # >256 MB überspringen + ) + if not skip: + buf = ctypes.create_string_buffer(mbi.RegionSize) + read = ctypes.c_size_t(0) + if k32.ReadProcessMemory(handle, ctypes.c_void_p(addr), + buf, mbi.RegionSize, ctypes.byref(read)): + chunks.append(bytes(buf[:read.value])) + addr += mbi.RegionSize + if addr >= 0x7FFFFFFFFFFF: # Ende des User-Space (64-bit) + break + finally: + k32.CloseHandle(handle) + + return chunks + + +# --------------------------------------------------------------------------- +# Linux – Speicher lesen via /proc/{pid}/mem +# --------------------------------------------------------------------------- + +def _linux_find_pid(name: str) -> "int | None": + """Findet PID anhand des Prozessnamens in /proc.""" + for entry in os.listdir("/proc"): + if not entry.isdigit(): + continue + try: + cmdline = open(f"/proc/{entry}/cmdline", "rb").read() + if name.lower().encode() in cmdline.lower(): + return int(entry) + except (PermissionError, FileNotFoundError): + continue + return None + + +def _linux_read_memory(pid: int) -> "list[bytes]": + """Liest lesbare Speichersegmente aus /proc/{pid}/mem.""" + chunks = [] + maps_path = f"/proc/{pid}/maps" + mem_path = f"/proc/{pid}/mem" + + try: + maps = open(maps_path).readlines() + mem = open(mem_path, "rb") + except PermissionError: + raise PermissionError( + f"Kein Zugriff auf /proc/{pid}/mem — " + "Script als gleicher Benutzer wie der Slicer starten." + ) + + for line in maps: + parts = line.split() + if len(parts) < 2: + continue + perms = parts[1] + if "r" not in perms: # nur lesbare Seiten + continue + if "x" in perms: # Code-Seiten überspringen (keine Strings) + continue + try: + start, end = [int(x, 16) for x in parts[0].split("-")] + except ValueError: + continue + size = end - start + if size > 256 * 1024 * 1024: # >256 MB überspringen + continue + try: + mem.seek(start) + data = mem.read(size) + if data: + chunks.append(data) + except (OSError, ValueError): + continue + + mem.close() + return chunks + + +# --------------------------------------------------------------------------- +# Pattern-Suche +# --------------------------------------------------------------------------- + +def _is_valid_ip(ip_bytes: bytes) -> bool: + parts = ip_bytes.split(b".") + if len(parts) != 4: + return False + try: + nums = [int(p) for p in parts] + except ValueError: + return False + if nums[0] in (0, 127, 255): + return False + if nums == [0, 0, 0, 0]: + return False + return all(0 <= n <= 255 for n in nums) + + +def search_chunks(chunks: "list[bytes]") -> dict: + """Durchsucht Speicher-Chunks nach Credential-Patterns.""" + usernames = {} # value → count + passwords = {} + ips = {} + device_ids = {} + + total = len(chunks) + for i, chunk in enumerate(chunks): + if i % 50 == 0 or i == total - 1: + pct = (i + 1) * 100 // total + mb_done = sum(len(c) for c in chunks[:i+1]) / 1024 / 1024 + print(f"\r[*] Analysiere ... {pct:3d}% ({mb_done:.0f} MB)", end="", flush=True) + for m in RE_USERNAME.finditer(chunk): + v = m.group().decode("ascii", errors="replace") + usernames[v] = usernames.get(v, 0) + 1 + + # Proximity: Passwort das innerhalb von 512 Bytes nach einem Username steht + for m in RE_USER_PASS_PROXIMITY.finditer(chunk): + pw = m.group(2).decode("ascii", errors="replace") + has_upper = any(c.isupper() for c in pw) + has_lower = any(c.islower() for c in pw) + has_digit = any(c.isdigit() for c in pw) + if has_upper and has_lower and has_digit: + passwords[pw] = passwords.get(pw, 0) + 500 + + for m in RE_PASSWORD.finditer(chunk): + v = m.group().decode("ascii", errors="replace") + # Filter: mindestens 2 Großbuchstaben + 2 Kleinbuchstaben + 1 Ziffer + has_upper = sum(1 for c in v if c.isupper()) >= 2 + has_lower = sum(1 for c in v if c.islower()) >= 2 + has_digit = sum(1 for c in v if c.isdigit()) >= 1 + if has_upper and has_lower and has_digit: + passwords[v] = passwords.get(v, 0) + 1 + + for m in RE_IP.finditer(chunk): + ip = m.group(1) + if _is_valid_ip(ip): + v = ip.decode("ascii") + ips[v] = ips.get(v, 0) + 1 + + for m in RE_DEVICE_ID.finditer(chunk): + v = m.group().decode("ascii") + device_ids[v] = device_ids.get(v, 0) + 1 + + print() # Zeilenumbruch nach Fortschrittszeile + + # Nach Häufigkeit sortieren, häufigste zuerst + def top(d, n=10): + return sorted(d.items(), key=lambda x: -x[1])[:n] + + return { + "usernames": top(usernames), + "passwords": top(passwords), + "ips": top(ips, 10), + "device_ids": top(device_ids), + } + + +# --------------------------------------------------------------------------- +# Hauptprogramm +# --------------------------------------------------------------------------- + +SLICER_NAMES = [ + "AnycubicSlicerNext.exe", # Windows + "AnycubicSlicer.exe", + "AnycubicSlicerNext", # Linux + "AnycubicSlicer", +] + + +def find_slicer_pid() -> "tuple[int, str] | tuple[None, None]": + for name in SLICER_NAMES: + if IS_WINDOWS: + pid = _win_find_pid(name) + else: + pid = _linux_find_pid(name) + if pid: + return pid, name + return None, None + + +def read_process(pid: int) -> "list[bytes]": + if IS_WINDOWS: + return _win_read_memory(pid) + else: + return _linux_read_memory(pid) + + +def write_env(results: dict, env_path: str, + username: str, password: str, ip: str, + mode_id: str = "20030", device_id: str = ""): + lines = [] + if os.path.isfile(env_path): + with open(env_path) as f: + lines = f.readlines() + + def set_val(key, val): + nonlocal lines + for i, line in enumerate(lines): + if line.startswith(f"{key}="): + lines[i] = f"{key}={val}\n" + return + lines.append(f"{key}={val}\n") + + set_val("PRINTER_IP", ip) + set_val("MQTT_USERNAME", username) + set_val("MQTT_PASSWORD", password) + if device_id: + set_val("DEVICE_ID", device_id) + if mode_id: + set_val("MODE_ID", mode_id) + + with open(env_path, "w") as f: + f.writelines(lines) + print(f"\n✓ Credentials in '{env_path}' gespeichert.") + + +def main(): + parser = argparse.ArgumentParser( + description="Extrahiert MQTT-Credentials aus dem RAM des AnycubicSlicer-Prozesses" + ) + parser.add_argument("--write-env", action="store_true", + help="Gefundene Credentials in .env schreiben") + parser.add_argument("--env-file", default=None, + help="Pfad zur .env-Datei (Standard: ../. env relativ zu diesem Script)") + parser.add_argument("--pid", type=int, default=None, + help="Prozess-PID direkt angeben (überspringt Auto-Erkennung)") + parser.add_argument("--verbose", action="store_true", + help="Alle Kandidaten ausgeben, nicht nur den besten") + args = parser.parse_args() + + # .env-Pfad bestimmen + if args.env_file: + env_path = args.env_file + else: + env_path = os.path.join(os.path.dirname(__file__), "..", ".env") + env_path = os.path.normpath(env_path) + + # Prozess finden + if args.pid: + pid, proc_name = args.pid, f"PID {args.pid}" + else: + print("[*] Suche AnycubicSlicer-Prozess ...") + pid, proc_name = find_slicer_pid() + if not pid: + print("✗ AnycubicSlicer nicht gefunden. Bitte den Slicer starten und " + "mit dem Drucker verbinden, dann erneut ausführen.") + sys.exit(1) + + print(f"[*] Prozess gefunden: {proc_name} (PID {pid})") + print(f"[*] Lese Prozess-Speicher ...") + + try: + chunks = read_process(pid) + except PermissionError as e: + print(f"✗ Zugriffsfehler: {e}") + sys.exit(1) + + total_mb = sum(len(c) for c in chunks) / 1024 / 1024 + print(f"[*] {len(chunks)} Speichersegmente gelesen ({total_mb:.1f} MB)") + print(f"[*] Durchsuche nach Credentials ...") + + results = search_chunks(chunks) + + # Ausgabe + print("\n" + "="*55) + print(" ERGEBNISSE") + print("="*55) + + def show(label, items, verbose): + if not items: + print(f" {label:12s} — nicht gefunden") + return items[0][0] if items else "" + best = items[0][0] + print(f" {label:12s} {best} (Treffer: {items[0][1]})") + if verbose and len(items) > 1: + for val, cnt in items[1:]: + print(f" {'':12s} {val} (Treffer: {cnt})") + return best + + best_user = show("Username", results["usernames"], args.verbose) + best_pass = show("Password", results["passwords"], args.verbose) + best_device = show("Device-ID", results["device_ids"], args.verbose) + + # IP: 192.168.x.x bevorzugen + lan_ips = [(ip, cnt) for ip, cnt in results["ips"] + if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.")] + if not lan_ips: + lan_ips = results["ips"] + best_ip = show("Drucker-IP", lan_ips, args.verbose) + + print("="*55) + + if not best_user or not best_pass: + print("\n⚠ Keine vollständigen Credentials gefunden.") + print(" Stelle sicher dass der Slicer MIT dem Drucker verbunden ist.") + sys.exit(1) + + if args.write_env: + write_env(results, env_path, best_user, best_pass, best_ip, + device_id=best_device) + else: + print(f"\nHinweis: --write-env übergeben um Credentials in '{env_path}' zu speichern.") + + +if __name__ == "__main__": + main() diff --git a/kobra_x_orcaslicer_preset.zip b/kobra_x_orcaslicer_preset.zip new file mode 100644 index 0000000..7310e61 Binary files /dev/null and b/kobra_x_orcaslicer_preset.zip differ diff --git a/kobrax_client.py b/kobrax_client.py new file mode 100644 index 0000000..4303b3e --- /dev/null +++ b/kobrax_client.py @@ -0,0 +1,554 @@ +""" +kobrax_client.py – Anycubic Kobra X LAN-MQTT-Client + +Protokoll vollständig rekonstruiert via Sniffer 2026-04-17 (953 Nachrichten). + +Voraussetzungen: + - /tmp/anycubic_slicer.crt und .key (aus cloud_mqtt.dll @ 0x2ed5b0 / 0x2edce0) + - Drucker im LAN-Modus erreichbar auf Port 9883 + +Verwendung: + client = KobraXClient(env_loader.PRINTER_IP, mode_id=env_loader.MODE_ID, + device_id=env_loader.DEVICE_ID) + client.connect() + info = client.query_info() + print(info["data"]["temp"]) + client.disconnect() +""" + +import json +import os +import socket +import ssl +import threading +import time +import uuid +from datetime import datetime + +import env_loader + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +CERT_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.crt") +KEY_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.key") + + +# --------------------------------------------------------------------------- +# Low-level MQTT framing +# --------------------------------------------------------------------------- + +def _enc_str(s: str) -> bytes: + b = s.encode("utf-8") + return len(b).to_bytes(2, "big") + b + + +def _enc_len(n: int) -> bytes: + out = bytearray() + while True: + d = n % 128 + n //= 128 + if n > 0: + d |= 0x80 + out.append(d) + if n == 0: + break + return bytes(out) + + +def _build_connect(client_id: str, username: str, password: str) -> bytes: + proto = b"\x00\x04MQTT\x04" + ka = b"\x00\x3c" # keepalive = 60s + flags = 0xC2 # username + password, clean session + payload = _enc_str(client_id) + _enc_str(username) + _enc_str(password) + body = proto + bytes([flags]) + ka + payload + return bytes([0x10]) + _enc_len(len(body)) + body + + +def _build_subscribe(topic: str, pid: int) -> bytes: + p = pid.to_bytes(2, "big") + _enc_str(topic) + b"\x00" + return bytes([0x82]) + _enc_len(len(p)) + p + + +def _build_publish(topic: str, payload: str) -> bytes: + body = _enc_str(topic) + payload.encode("utf-8") + return bytes([0x30]) + _enc_len(len(body)) + body + + +def _build_pingreq() -> bytes: + return bytes([0xC0, 0x00]) + + +def _parse_publish(pkt: bytes): + if len(pkt) < 2: + return None, None + tlen = (pkt[0] << 8) | pkt[1] + if 2 + tlen > len(pkt): + return None, None + topic = pkt[2:2 + tlen].decode("utf-8", errors="replace") + payload = pkt[2 + tlen:] + return topic, payload + + +# --------------------------------------------------------------------------- +# KobraXClient +# --------------------------------------------------------------------------- + +class KobraXClient: + def __init__(self, host: str, username: str, password: str, + mode_id: str, device_id: str, + port: int = 9883, client_id: str = "kobrax_py"): + self.host = host + self.port = port + self.username = username + self.password = password + self.mode_id = mode_id + self.device_id = device_id + self.client_id = client_id + + self._sock = None + self._buf = b"" + self._pid = 1 + self._lock = threading.Lock() + self._running = False + + # Pending requests by msgid (for response ACK) + self._pending_msgid: dict[str, dict] = {} + # Pending requests by msg_type/report topic suffix + self._pending_report: dict[str, dict] = {} + + # Optional callbacks: topic_suffix → callable(payload_dict) + self.callbacks: dict[str, callable] = {} + + # -- Topics -------------------------------------------------------------- + + def _pub_topic(self, msg_type: str) -> str: + return (f"anycubic/anycubicCloud/v1/slicer/printer/" + f"{self.mode_id}/{self.device_id}/{msg_type}") + + def _sub_topic(self) -> str: + return (f"anycubic/anycubicCloud/v1/printer/public/" + f"{self.mode_id}/{self.device_id}/#") + + # -- Connection ---------------------------------------------------------- + + def _do_connect(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_ciphers("DEFAULT:@SECLEVEL=0") + ctx.load_cert_chain(CERT_FILE, KEY_FILE) + + raw = socket.create_connection((self.host, self.port), timeout=5) + self._sock = ctx.wrap_socket(raw) + print(f"[kobrax] TLS: {self._sock.cipher()[0]}") + + self._sock.sendall(_build_connect(self.client_id, self.username, self.password)) + self._sock.settimeout(3) + r = self._sock.recv(64) + if len(r) < 4 or r[0] != 0x20 or r[3] != 0: + raise RuntimeError(f"CONNACK failed: {r.hex()}") + print(f"[kobrax] CONNACK rc=0") + + self._sock.settimeout(0.2) + self._buf = b"" + self._subscribe(self._sub_topic()) + + def connect(self): + self._do_connect() + self._running = True + t = threading.Thread(target=self._read_loop, daemon=True) + t.start() + time.sleep(0.3) + + def disconnect(self): + self._running = False + try: + self._sock.close() + except Exception: + pass + + def _reconnect(self): + print("[kobrax] Verbindung verloren – reconnect…") + try: + self._sock.close() + except Exception: + pass + for delay in [2, 4, 8, 15, 30]: + try: + self._do_connect() + print("[kobrax] Reconnect erfolgreich") + return True + except Exception as e: + print(f"[kobrax] Reconnect fehlgeschlagen ({e}), warte {delay}s…") + time.sleep(delay) + return False + + def _subscribe(self, topic: str): + with self._lock: + pid = self._pid + self._pid += 1 + self._sock.sendall(_build_subscribe(topic, pid)) + print(f"[kobrax] SUB {topic}") + + # -- Read loop ----------------------------------------------------------- + + def _read_loop(self): + last_ping = time.time() + while self._running: + if time.time() - last_ping > 30: + with self._lock: + try: + self._sock.sendall(_build_pingreq()) + except Exception: + if self._running and not self._reconnect(): + break + last_ping = time.time() + continue + last_ping = time.time() + try: + data = self._sock.recv(65536) + if not data: + raise ConnectionResetError("EOF") + self._buf += data + self._drain() + except ssl.SSLWantReadError: + continue + except socket.timeout: + continue + except Exception as e: + if self._running: + print(f"[kobrax] reader error: {e}") + if not self._reconnect(): + break + last_ping = time.time() + else: + break + + def _drain(self): + buf = self._buf + idx = 0 + while idx < len(buf): + ptype = buf[idx] & 0xF0 + i = idx + 1 + mul = 1 + rem = 0 + while i < len(buf): + b = buf[i] + rem += (b & 0x7F) * mul + mul *= 128 + i += 1 + if not (b & 0x80): + break + if i + rem > len(buf): + break + pkt = buf[i:i + rem] + idx = i + rem + + if ptype == 0x30: + topic, raw_payload = _parse_publish(pkt) + if topic is None: + continue + try: + payload = json.loads(raw_payload) + except Exception: + payload = {"_raw": raw_payload.decode("utf-8", errors="replace")} + self._dispatch(topic, payload) + + self._buf = buf[idx:] + + def _dispatch(self, topic: str, payload: dict): + # Resolve by report topic suffix (e.g. "info/report") + suffix = "/".join(topic.split("/")[-2:]) + if suffix in self._pending_report: + entry = self._pending_report[suffix] + entry["result"] = payload + entry["event"].set() + + # Resolve by msgid (for generic response ACK) + msgid = payload.get("msgid") + if msgid and msgid in self._pending_msgid: + entry = self._pending_msgid[msgid] + entry["result"] = payload + entry["event"].set() + + # User callbacks by topic suffix (last two path components) + suffix = "/".join(topic.split("/")[-2:]) + if suffix in self.callbacks: + try: + self.callbacks[suffix](payload) + except Exception as e: + print(f"[kobrax] callback error for {suffix}: {e}") + + # Generic wildcard callback + if "*" in self.callbacks: + try: + self.callbacks["*"](topic, payload) + except Exception as e: + print(f"[kobrax] wildcard callback error: {e}") + + # -- Publish + request/response ------------------------------------------ + + def publish(self, msg_type: str, action: str, data=None, timeout: float = 5.0) -> dict | None: + msgid = str(uuid.uuid4()) + payload = json.dumps({ + "type": msg_type, + "action": action, + "msgid": msgid, + "timestamp": int(time.time() * 1000), + "data": data, + }, separators=(",", ":")) + + # Wait by msgid only — avoids collisions when multiple threads + # call publish() for the same msg_type concurrently. + # Also register by report topic as fallback for responses without msgid. + report_key = f"{msg_type}/report" + event = threading.Event() + entry = {"event": event, "result": None} + self._pending_msgid[msgid] = entry + # Only register report-key waiter if nobody else is waiting on it + report_registered = False + if report_key not in self._pending_report: + self._pending_report[report_key] = entry + report_registered = True + + topic = self._pub_topic(msg_type) + try: + with self._lock: + self._sock.sendall(_build_publish(topic, payload)) + except Exception as e: + print(f"[kobrax] send error: {e}, reconnecting…") + self._pending_msgid.pop(msgid, None) + if report_registered: + self._pending_report.pop(report_key, None) + if not self._reconnect(): + return None + # retry once after reconnect + try: + with self._lock: + self._sock.sendall(_build_publish(topic, payload)) + self._pending_msgid[msgid] = entry + if report_registered: + self._pending_report[report_key] = entry + except Exception: + return None + + if timeout <= 0: + self._pending_msgid.pop(msgid, None) + if report_registered: + self._pending_report.pop(report_key, None) + return None + + received = event.wait(timeout) + self._pending_msgid.pop(msgid, None) + if report_registered: + self._pending_report.pop(report_key, None) + if not received: + return None + return entry["result"] + + # -- High-level commands ------------------------------------------------- + + def query_info(self) -> dict | None: + return self.publish("info", "query") + + def query_status(self) -> dict | None: + return self.publish("status", "query") + + def query_multicolor_box(self) -> dict | None: + return self.publish("multiColorBox", "getInfo") + + def set_temperature(self, nozzle: int, bed: int) -> dict | None: + return self.publish("tempature", "set", + {"target_nozzle_temp": nozzle, "target_hotbed_temp": bed}) + + def set_fan(self, pct: int) -> dict | None: + return self.publish("fan", "set", {"fan_speed_pct": pct}) + + def set_light(self, on: bool, brightness: int = 80) -> dict | None: + return self.publish("light", "control", + {"type": 2, "status": 1 if on else 0, "brightness": brightness}) + + def start_camera(self) -> dict | None: + return self.publish("video", "startCapture") + + def stop_camera(self) -> dict | None: + return self.publish("video", "stopCapture") + + def pause_print(self) -> dict | None: + return self.publish("print", "pause") + + def resume_print(self) -> dict | None: + return self.publish("print", "resume") + + def stop_print(self) -> dict | None: + return self.publish("print", "stop") + + # -- G-Code Upload ------------------------------------------------------- + + def upload_gcode(self, filepath: str, remote_filename: str | None = None, + upload_url: str | None = None) -> dict: + """Upload a G-Code or .3mf file via HTTP POST to port 18910. + + Returns the parsed JSON response from the printer. + Raises RuntimeError on HTTP or connection errors. + + Protocol captured via Wireshark 2026-04-18: + POST /gcode_upload?s={session_token} + Multipart fields: 'filename' (text) + 'gcode' (file bytes) + Required headers: X-File-Length, X-BBL-* (BambuLab heritage) + """ + if not upload_url: + info = self.query_info() + if not info: + raise RuntimeError("Could not get info/report for upload URL") + upload_url = info["data"]["urls"]["fileUploadurl"] + # parse token from URL query string + token = upload_url.split("?s=")[1] if "?s=" in upload_url else "" + + with open(filepath, "rb") as f: + file_data = f.read() + + if remote_filename is None: + remote_filename = os.path.basename(filepath) + + boundary = "------------------------a3a050b927d92a4c" + sep = f"--{boundary}\r\n".encode() + end = f"--{boundary}--\r\n".encode() + + part_filename = ( + sep + + f'Content-Disposition: form-data; name="filename"\r\n\r\n'.encode() + + remote_filename.encode() + b"\r\n" + ) + part_gcode = ( + sep + + f'Content-Disposition: form-data; name="gcode"; filename="{remote_filename}"\r\n' + f'Content-Type: application/octet-stream\r\n\r\n'.encode() + + file_data + b"\r\n" + ) + body = part_filename + part_gcode + end + + headers = ( + f"POST /gcode_upload?s={token} HTTP/1.1\r\n" + f"Host: {self.host}:18910\r\n" + f"User-Agent: AnycubicSlicerNext/1.3.9.4\r\n" + f"Accept: */*\r\n" + f"X-BBL-Client-Name: AnycubicSlicerNext\r\n" + f"X-BBL-Client-Type: slicer\r\n" + f"X-BBL-Client-Version: 01.03.09.04\r\n" + f"X-BBL-Device-ID: {str(uuid.uuid4())}\r\n" + f"X-BBL-Language: de-DE\r\n" + f"X-BBL-OS-Type: windows\r\n" + f"X-BBL-OS-Version: 10.0.26200\r\n" + f"X-File-Length: {len(file_data)}\r\n" + f"Content-Type: multipart/form-data; boundary={boundary}\r\n" + f"Content-Length: {len(body)}\r\n" + f"Connection: close\r\n\r\n" + ).encode() + + sock = socket.create_connection((self.host, 18910), timeout=30) + sock.sendall(headers + body) + sock.settimeout(10) + response = b"" + try: + while True: + chunk = sock.recv(65536) + if not chunk: + break + response += chunk + except socket.timeout: + pass + sock.close() + + # parse HTTP response body + if b"\r\n\r\n" in response: + body_start = response.index(b"\r\n\r\n") + 4 + resp_body = response[body_start:] + else: + resp_body = response + try: + return json.loads(resp_body) + except Exception: + raise RuntimeError(f"Upload: unerwartete Antwort: {resp_body[:200]}") + + def move_axis(self, axis: int, move_type: int = 2, distance: int = 0) -> dict | None: + return self.publish("axis", "move", + {"axis": axis, "move_type": move_type, "distance": distance}) + + def home_all(self) -> dict | None: + # axis=4 move_type=2 = Home all axes (~4-15s) + return self.publish("axis", "move", {"axis": 4, "move_type": 2, "distance": 0}, timeout=30.0) + + def home_axis(self, axis: int) -> dict | None: + # axis: 1=Y, 2=X, 3=Z + return self.publish("axis", "move", {"axis": axis, "move_type": 2, "distance": 0}, timeout=30.0) + + def jog(self, axis: int, direction: int, distance_mm: int = 1) -> dict | None: + # axis: 1=Y, 2=X, 3=Z direction: 0=neg, 1=pos + return self.move_axis(axis=axis, move_type=direction, distance=distance_mm) + + +# --------------------------------------------------------------------------- +# CLI Demo +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Anycubic Kobra X LAN-Client") + parser.add_argument("--ip", default=env_loader.PRINTER_IP) + parser.add_argument("--port", type=int, default=env_loader.MQTT_PORT) + parser.add_argument("--username", default=env_loader.USERNAME) + parser.add_argument("--password", default=env_loader.PASSWORD) + parser.add_argument("--mode-id", default=env_loader.MODE_ID) + parser.add_argument("--device-id", default=env_loader.DEVICE_ID) + parser.add_argument("--monitor", action="store_true", + help="Dauerhaft mithören und alle Reports ausgeben") + args = parser.parse_args() + + client = KobraXClient( + host=args.ip, port=args.port, + username=args.username, password=args.password, + mode_id=args.mode_id, device_id=args.device_id, + ) + + if args.monitor: + def on_msg(topic, payload): + suffix = "/".join(topic.split("/")[-2:]) + ts = datetime.now().strftime("%H:%M:%S") + state = payload.get("state", "") + data = payload.get("data") or {} + if "progress" in data: + print(f"[{ts}] {suffix:25} state={state:12} progress={data['progress']}% layer={data.get('curr_layer','?')}/{data.get('total_layers','?')}") + elif "curr_nozzle_temp" in data: + print(f"[{ts}] {suffix:25} nozzle={data['curr_nozzle_temp']}°C/{data.get('target_nozzle_temp',0)}°C bed={data['curr_hotbed_temp']}°C/{data.get('target_hotbed_temp',0)}°C") + else: + print(f"[{ts}] {suffix:25} state={state}") + + client.callbacks["*"] = on_msg + client.connect() + print("[kobrax] Monitor-Modus aktiv (Ctrl-C zum Beenden)") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + client.disconnect() + else: + client.connect() + + print("\n--- query_info ---") + info = client.query_info() + if info: + d = info.get("data", {}) + print(f" Drucker: {d.get('printerName')} FW {d.get('version')}") + print(f" Status: {d.get('state')}") + t = d.get("temp", {}) + print(f" Nozzle: {t.get('curr_nozzle_temp')}°C → {t.get('target_nozzle_temp')}°C") + print(f" Bett: {t.get('curr_hotbed_temp')}°C → {t.get('target_hotbed_temp')}°C") + urls = d.get("urls", {}) + print(f" Upload: {urls.get('fileUploadurl')}") + print(f" Kamera: {urls.get('rtspUrl')}") + else: + print(" Keine Antwort") + + client.disconnect() diff --git a/kobrax_moonraker_bridge.py b/kobrax_moonraker_bridge.py new file mode 100644 index 0000000..fdb0e2f --- /dev/null +++ b/kobrax_moonraker_bridge.py @@ -0,0 +1,2067 @@ +""" +kobrax_moonraker_bridge.py – Moonraker-kompatibler HTTP/WebSocket-Bridge für Anycubic Kobra X + +Emuliert die Moonraker/Klipper-API damit OrcaSlicer den Kobra X direkt ansteuern kann. + +Verwendung: + python kobrax_moonraker_bridge.py --printer-ip 192.168.178.94 + +OrcaSlicer-Konfiguration: + Drucker-Typ: Klipper | Host: 127.0.0.1 | Port: 7125 +""" + +import argparse +import env_loader +import asyncio +import hashlib +import json +import logging +import os +import sys +import tempfile +import time +import threading + +# kobrax_client aus dem selben Verzeichnis importieren +sys.path.insert(0, os.path.dirname(__file__)) +from kobrax_client import KobraXClient + +try: + from aiohttp import web + import aiohttp +except ImportError: + print("Fehler: aiohttp nicht installiert. Bitte: pip install aiohttp") + sys.exit(1) + +logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s %(message)s", + datefmt="%H:%M:%S") +log = logging.getLogger("bridge") + +KOBRA_TO_KLIPPER_STATE = { + "free": "standby", + "busy": "printing", + "printing": "printing", + "preheating": "printing", + "auto_leveling": "printing", + "checking": "printing", + "updated": "printing", + "init": "printing", + "finished": "complete", + "failed": "error", + "canceled": "standby", +} + +MOONRAKER_VERSION = "v0.9.3-1" +KLIPPER_VERSION = "v0.12.0-1" + + +class KobraXBridge: + def __init__(self, client: KobraXClient): + self.client = client + self.ws_clients: set[web.WebSocketResponse] = set() + self._last_state: dict = {} + self._state = { + "nozzle_temp": 0.0, + "nozzle_target": 0.0, + "bed_temp": 0.0, + "bed_target": 0.0, + "print_state": "standby", + "filename": "", + "progress": 0.0, + "print_duration": 0, + "curr_layer": 0, + "total_layers": 0, + "printer_name": "Anycubic Kobra X", + "firmware_version": "unknown", + "upload_url": "", + "camera_url": "", + "fan_speed": 0, + "light_on": False, + "light_brightness": 80, + } + self._ams_slots: list[dict] = [] + self._ams_loaded_slot: int = -1 + self._last_uploaded_file: str = "" + self._serve_dir = tempfile.TemporaryDirectory(prefix="kobrax_serve_") + self._serve_dir_path: str = self._serve_dir.name + + self._thumbnail_b64: str = "" # base64-PNG aus file/report + + # Register MQTT push callbacks + client.callbacks["tempature/report"] = self._on_temp + client.callbacks["print/report"] = self._on_print + client.callbacks["info/report"] = self._on_info + client.callbacks["file/report"] = self._on_file + client.callbacks["multiColorBox/report"] = self._on_multicolor_box + + # ------------------------------------------------------------------------- + # MQTT callbacks (called from reader thread) + # ------------------------------------------------------------------------- + + def _on_temp(self, payload: dict): + d = payload.get("data") or {} + self._state["nozzle_temp"] = float(d.get("curr_nozzle_temp", 0)) + self._state["nozzle_target"] = float(d.get("target_nozzle_temp", 0)) + self._state["bed_temp"] = float(d.get("curr_hotbed_temp", 0)) + self._state["bed_target"] = float(d.get("target_hotbed_temp", 0)) + self._push_status_update() + + def _on_print(self, payload: dict): + d = payload.get("data") or {} + kobra_state = payload.get("state", "") + self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "printing") + self._state["filename"] = d.get("filename", self._state["filename"]) + if "progress" in d: + self._state["progress"] = float(d["progress"]) / 100.0 + if "print_time" in d: + self._state["print_duration"] = int(d["print_time"]) * 60 + if "curr_layer" in d: + self._state["curr_layer"] = d["curr_layer"] + if "total_layers" in d: + self._state["total_layers"] = d["total_layers"] + self._push_status_update() + + def _on_info(self, payload: dict): + d = payload.get("data") or {} + self._state["printer_name"] = d.get("printerName", self._state["printer_name"]) + self._state["firmware_version"] = d.get("version", self._state["firmware_version"]) + kobra_state = d.get("state", "") + if kobra_state: + self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "standby") + t = d.get("temp") or {} + if t: + self._state["nozzle_temp"] = float(t.get("curr_nozzle_temp", 0)) + self._state["nozzle_target"] = float(t.get("target_nozzle_temp", 0)) + self._state["bed_temp"] = float(t.get("curr_hotbed_temp", 0)) + self._state["bed_target"] = float(t.get("target_hotbed_temp", 0)) + urls = d.get("urls") or {} + if urls.get("fileUploadurl"): + self._state["upload_url"] = urls["fileUploadurl"] + if urls.get("rtspUrl"): + self._state["camera_url"] = urls["rtspUrl"] + fan = d.get("fan_speed_pct") + if fan is not None: + self._state["fan_speed"] = int(fan) + self._push_status_update() + + def _on_file(self, payload: dict): + d = payload.get("data") or {} + details = d.get("file_details") or {} + thumb = details.get("thumbnail") or details.get("png_image") or "" + if thumb: + self._thumbnail_b64 = thumb + log.info(f"Vorschaubild empfangen: {len(thumb)} Zeichen base64") + self._push_status_update() + + def _on_multicolor_box(self, payload: dict): + boxes = (payload.get("data") or {}).get("multi_color_box") or [] + if not boxes: + return + box = boxes[0] + slots = box.get("slots") or [] + loaded = box.get("loaded_slot", -1) + if loaded != -1: + self._ams_loaded_slot = loaded + # Tip-Forming: nach Einziehen (status=10) oder Ausziehen (status=11) + # schickt der originale Slicer automatisch type=3 (Extruder-Rückzug) + fs = box.get("feed_status") or {} + current_status = fs.get("current_status") + slot_index = fs.get("slot_index", 0) + if current_status in (10, 11): + import threading + def _tip_form(): + import time; time.sleep(2) + self.client.publish( + "multiColorBox", "feedFilament", + {"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": 3}}]}, + timeout=0 + ) + log.info(f"Tip-Forming (type=3) nach status={current_status} slot={slot_index}") + threading.Thread(target=_tip_form, daemon=True).start() + if slots: + self._ams_slots = slots + log.info(f"AMS-Slots empfangen: {len(slots)}, loaded_slot={self._ams_loaded_slot}") + self._push_status_update() + + # ------------------------------------------------------------------------- + # WebSocket push + # ------------------------------------------------------------------------- + + def _push_status_update(self): + if not self.ws_clients: + return + msg = { + "jsonrpc": "2.0", + "method": "notify_status_update", + "params": [self._build_printer_objects(), time.time()], + } + text = json.dumps(msg) + dead = set() + for ws in self.ws_clients: + try: + asyncio.run_coroutine_threadsafe(ws.send_str(text), ws._loop) + except Exception: + dead.add(ws) + self.ws_clients -= dead + + def _build_printer_objects(self) -> dict: + s = self._state + return { + "extruder": { + "temperature": s["nozzle_temp"], + "target": s["nozzle_target"], + "power": 0.0, + }, + "heater_bed": { + "temperature": s["bed_temp"], + "target": s["bed_target"], + "power": 0.0, + }, + "print_stats": { + "state": s["print_state"], + "filename": s["filename"], + "print_duration": s["print_duration"], + "total_duration": s["print_duration"], + "info": { + "current_layer": s["curr_layer"], + "total_layer": s["total_layers"], + }, + }, + "display_status": { + "progress": s["progress"], + "message": "", + }, + "virtual_sdcard": { + "progress": s["progress"], + "is_active": s["print_state"] == "printing", + "file_path": s["filename"], + }, + "toolhead": { + "position": [0, 0, 0, 0], + "homed_axes": "xyz", + "print_time": s["print_duration"], + "estimated_print_time": s["print_duration"], + }, + } + + # ------------------------------------------------------------------------- + # HTTP handlers + # ------------------------------------------------------------------------- + + async def handle_server_info(self, request): + return web.json_response({ + "result": { + "klippy_connected": True, + "klippy_state": "ready", + "components": ["file_manager", "job_state", "virtual_sdcard"], + "failed_components":[], + "registered_directories": ["gcodes"], + "warnings": [], + "websocket_count": len(self.ws_clients), + "moonraker_version": MOONRAKER_VERSION, + "api_version": [1, 3, 0], + "api_version_string": "1.3.0", + } + }) + + async def handle_printer_info(self, request): + s = self._state + return web.json_response({ + "result": { + "state": "ready", + "state_message": "Printer is ready", + "hostname": "kobrax-bridge", + "klipper_path": "/home/pi/klipper", + "python_path": "/home/pi/klippy-env/bin/python", + "log_file": "/tmp/klippy.log", + "config_file": "/home/pi/printer.cfg", + "software_version": KLIPPER_VERSION, + "cpu_info": s["printer_name"], + } + }) + + async def handle_machine_system_info(self, request): + return web.json_response({ + "result": { + "system_info": { + "cpu_info": {"cpu_count": 4, "bits": "64bit", "processor": "armv7l", + "cpu_desc": "Anycubic Kobra X Bridge", "serial_number": "", + "hardware_desc": "", "model": "Kobra X Bridge", + "total_memory": 524288, "memory_units": "kB"}, + "sd_info": {}, + "distribution": {"name": "Linux", "id": "linux", "version": "1.0", + "version_parts": {}, "like": "", "codename": ""}, + "available_services": [], + "service_state": {}, + "python": {"version": list(sys.version_info[:3]), "version_string": sys.version}, + "network": {}, + "canbus": {}, + } + } + }) + + async def handle_objects_query(self, request): + objects = self._build_printer_objects() + # filter by requested objects if specified + requested = dict(request.rel_url.query) + if requested: + filtered = {k: objects[k] for k in requested if k in objects} + else: + filtered = objects + return web.json_response({"result": {"status": filtered, "eventtime": time.time()}}) + + async def handle_objects_list(self, request): + return web.json_response({ + "result": { + "objects": list(self._build_printer_objects().keys()) + } + }) + + async def handle_objects_subscribe(self, request): + return web.json_response({ + "result": { + "status": self._build_printer_objects(), + "eventtime": time.time(), + } + }) + + async def handle_files_list(self, request): + filename = self._state.get("filename", "") + files = [] + if filename: + files.append({ + "path": filename, + "modified": time.time(), + "size": 0, + "permissions": "rw", + }) + return web.json_response({"result": files}) + + async def handle_file_upload(self, request): + log.info(f"Upload-Request: {request.method} {request.path_qs} CT={request.headers.get('Content-Type','')[:60]}") + ct = request.headers.get("Content-Type", "") + if "multipart" not in ct: + return web.json_response({"error": "expected multipart"}, status=400) + reader = await request.multipart() + file_data = None + remote_filename = self._last_uploaded_file or "upload.gcode" + + async for part in reader: + if part.name in ("file", "gcode", "upload_file"): + remote_filename = part.filename or remote_filename + file_data = await part.read() + log.info(f"Multipart-Feld '{part.name}': {remote_filename} ({len(file_data)} bytes)") + elif part.name == "path": + val = (await part.read()).decode("utf-8", errors="replace").strip() + if val: + remote_filename = val + else: + log.debug(f"Unbekanntes Multipart-Feld: {part.name}") + + if not file_data: + return web.json_response({"error": "no file received"}, status=400) + + file_md5 = hashlib.md5(file_data).hexdigest() + file_size = len(file_data) + + # Datei auf Disk ablegen (temp-Verzeichnis) damit Drucker sie per HTTP abrufen kann + safe_name = os.path.basename(remote_filename) # keine Pfad-Traversal + serve_path = os.path.join(self._serve_dir_path, safe_name) + with open(serve_path, "wb") as f: + f.write(file_data) + del file_data # RAM freigeben + + self._last_uploaded_file = remote_filename + log.info(f"Upload: {remote_filename} ({file_size} bytes) md5={file_md5} → Drucker") + + # Datei per HTTP auf den Drucker hochladen (serve_path liegt bereits auf Disk) + upload_url = self._state.get("upload_url") or None + loop = asyncio.get_event_loop() + try: + result = await loop.run_in_executor( + None, self.client.upload_gcode, serve_path, remote_filename, upload_url + ) + except Exception as e: + log.error(f"Upload fehlgeschlagen: {e}") + return web.json_response({"error": str(e)}, status=500) + + log.info(f"Upload erfolgreich: {result}") + + # Druck starten mit vollständigem Payload (inkl. serve-URL + md5 + size) + serve_url = f"http://{request.host}/serve/{remote_filename}" + log.info(f"Starte Druck automatisch: {remote_filename}") + loop = asyncio.get_event_loop() + loop.run_in_executor(None, lambda: self._start_print(remote_filename, serve_url, file_md5, file_size)) + + # OctoPrint-kompatibler Response (OrcaSlicer wertet refs aus) + return web.json_response({ + "done": True, + "files": { + "local": { + "name": remote_filename, + "origin": "local", + "path": remote_filename, + "refs": { + "download": f"http://{request.host}/api/files/local/{remote_filename}", + "resource": f"http://{request.host}/api/files/local/{remote_filename}", + } + } + }, + "result": { + "item": {"path": remote_filename, "root": "gcodes"}, + "action": "create_file", + } + }, status=201) + + def _start_print(self, filename: str, url: str = "", md5: str = "", filesize: int = 0): + use_ams = len(self._ams_slots) > 0 + ams_box_mapping = [ + { + "paint_index": i, + "ams_index": i, + "paint_color": [255, 255, 255, 255], + "ams_color": [255, 255, 255, 255], + "material_type": s.get("material_type", "PLA"), + } + for i, s in enumerate(self._ams_slots) + ] + payload = { + "taskid": "-1", + "url": url, + "filename": filename, + "md5": md5, + "filepath": None, + "filetype": 1, + "project_type": 1, + "filesize": filesize, + "ams_settings": { + "use_ams": use_ams, + "ams_box_mapping": ams_box_mapping, + }, + "task_settings": { + "auto_leveling": 1, + "vibration_compensation": 0, + "flow_calibration": 0, + "dry_mode": 0, + "ai_settings": {"status": 0, "count": 0, "type": 1}, + "timelapse": {"status": 0, "count": 0, "type": 64}, + "drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0}, + "model_objects_skip_parts": [], + }, + } + # Thumbnail vorab anfordern (Drucker antwortet async auf file/report) + self._thumbnail_b64 = "" + self.client.publish("file", "fileDetails", + {"root": "local", "filename": filename}, timeout=0) + + log.info(f"print/start → {filename} url={url} ams={len(self._ams_slots)} slots") + result = self.client.publish("print", "start", payload, timeout=15.0) + if result: + log.info(f"Druckstart bestätigt: state={result.get('state')}") + else: + log.warning("Druckstart: keine Antwort vom Drucker") + + async def handle_print_start(self, request): + try: + body = await request.json() + except Exception: + body = {} + filename = body.get("filename") or self._last_uploaded_file + if not filename: + return web.json_response({"error": "no filename"}, status=400) + + log.info(f"Druck starten: {filename}") + + # AMS-Mapping aus gecachtem State + ams_box_mapping = [] + for i, slot in enumerate(self._ams_slots): + ams_box_mapping.append({ + "slot_index": i, + "material_type": slot.get("material_type", "PLA"), + "color": slot.get("color", "FFFFFF"), + }) + + use_ams = len(self._ams_slots) > 0 + + payload = { + "filename": filename, + "taskid": str(int(time.time())), + "use_ams": use_ams, + } + if ams_box_mapping: + payload["ams_box_mapping"] = ams_box_mapping + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, lambda: self.client.publish("print", "start", payload, timeout=15.0) + ) + if result is None: + return web.json_response({"error": "Keine Antwort vom Drucker"}, status=504) + + return web.json_response({"result": "ok"}) + + async def handle_print_pause(self, request): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.client.pause_print) + return web.json_response({"result": "ok"}) + + async def handle_print_resume(self, request): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.client.resume_print) + return web.json_response({"result": "ok"}) + + async def handle_print_cancel(self, request): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.client.stop_print) + return web.json_response({"result": "ok"}) + + async def handle_octoprint_version(self, request): + return web.json_response({ + "api": "0.1", + "server": "1.9.0", + "text": "OctoPrint (Kobra X Bridge)", + }) + + async def handle_index(self, request): + html = r""" + + + + +Kobra X + + + + +
+ +
Anycubic Kobra X
+
Standby
+ + +
+ +
+ + +
+ +
+
+
+
+
+
📷 Kamera nicht gestartet
+
+ + + +
+
+ +
+
Fortschritt
+
0%
+
+
+
+ + +
+
+
+ + + +
+
+
+
+ + +
+
Temperaturen
+
+
+
Nozzle
+
+
+
°C
+
+
0°C
+ + + + +
+
+
Bett
+
+
+
°C
+
+
0°C
+ + + + +
+
+
+ + +
+
Licht & Lüfter
+
+ 💡 Licht + +
+
+ 🌀 Lüfter + + 0 +
+
+
+
+ + +
+
+
+
Drucksteuerung
+
+
0%
+
+
+ + +
+
+
+
+ + + +
+
+
+
Temperaturen (Live)
+ +
+
+
Nozzle
+
+
°C
+
+
0°C
+
+ + +
+
+
+
Bett
+
+
°C
+
+
0°C
+
+ + +
+
+
+
+
+
+ + +
+
+
+
Nozzle
+
+
+
°C
+
+
Ziel: 0°C
+
+
+
+
+ + + +
+
+
+
Heizbett
+
+
+
°C
+
+
Ziel: 0°C
+
+
+
+
+ + + +
+
+
+
Verlauf (letzte 60 Messungen)
+ +
+
+
+ + +
+
+
+
XY-Achsen
+
+
+ +
+ + + +
+ +
+
+
+ + + + +
+
+ + + + +
+
+
+
Z-Achse
+
+ + +
+
Schrittweite: 1 mm
+
+
+
+ + +
+
+
AMS / Filamentbox
+
+
+ Keine AMS-Daten empfangen +
+
+
+
Slot auswählen
+
+ + Slot 1 +
+
+ + +
+
+
+
+ + +
+
+
+
💡 Licht
+
+ Ein / Aus + +
+
+
+
🌀 Lüfter
+
+ Geschwindigkeit + + 0 +
+
+ + + + + +
+
+
+
📷 Kamera
+
+ + +
+
+
+
+
+ + +
+
+
Ereignis-Log
+
+
+
+
+
+ + + + + +""" + return web.Response(text=html, content_type="text/html", + headers={"Cache-Control": "no-store, no-cache, must-revalidate"}) + + async def handle_api_light(self, request): + try: + body = await request.json() + except Exception: + body = {} + on = bool(body.get("on", True)) + brightness = int(body.get("brightness", self._state["light_brightness"])) + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: self.client.publish( + "light", "control", + {"type": 3, "status": 1 if on else 0, "brightness": brightness}, + timeout=0 + )) + self._state["light_on"] = on + self._state["light_brightness"] = brightness + return web.json_response({"result": "ok"}) + + async def handle_api_fan(self, request): + try: + body = await request.json() + except Exception: + body = {} + speed = int(body.get("speed", 0)) + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: self.client.publish( + "fan", "setSpeed", {"fan_speed_pct": speed}, timeout=0 + )) + self._state["fan_speed"] = speed + return web.json_response({"result": "ok"}) + + async def handle_api_ams_feed(self, request): + try: + body = await request.json() + except Exception: + body = {} + slot_index = int(body.get("slot_index", 0)) + feed_type = int(body.get("type", 1)) + # Ausziehen (type=2): wenn kein Slot explizit gewählt, den zuletzt geladenen nehmen + if feed_type == 2 and self._ams_loaded_slot >= 0: + slot_index = self._ams_loaded_slot + loop = asyncio.get_event_loop() + def _send(): + resp = self.client.publish( + "multiColorBox", "feedFilament", + {"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": feed_type}}]}, + timeout=5 + ) + log.info(f"feedFilament type={feed_type} slot={slot_index} loaded_slot={self._ams_loaded_slot} → {resp}") + await loop.run_in_executor(None, _send) + return web.json_response({"result": "ok"}) + + async def handle_api_axis(self, request): + try: + body = await request.json() + except Exception: + body = {} + axis = int(body.get("axis", 4)) + move_type = int(body.get("move_type", 2)) + distance = float(body.get("distance", 0)) + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: self.client.publish( + "axis", "move", + {"axis": axis, "move_type": move_type, "distance": distance}, + timeout=0 + )) + return web.json_response({"result": "ok"}) + + async def handle_api_temperature(self, request): + try: + body = await request.json() + except Exception: + body = {} + nozzle = body.get("nozzle") + bed = body.get("bed") + loop = asyncio.get_event_loop() + if nozzle is not None: + n = int(float(nozzle)) + await loop.run_in_executor(None, lambda: self.client.publish( + "tempature", "set", + {"type": 0, "target_nozzle_temp": n, "target_hotbed_temp": 0}, + timeout=0 + )) + if bed is not None: + b = int(float(bed)) + await loop.run_in_executor(None, lambda: self.client.publish( + "tempature", "set", + {"type": 1, "target_hotbed_temp": b, "target_nozzle_temp": 0}, + timeout=0 + )) + return web.json_response({"result": "ok"}) + + async def handle_api_camera(self, request): + return web.json_response({"url": self._state["camera_url"]}) + + async def handle_api_camera_start(self, request): + loop = asyncio.get_event_loop() + # Wait for pushStarted confirmation before returning + result = await loop.run_in_executor(None, lambda: self.client.publish( + "video", "startCapture", None, timeout=8.0 + )) + state = (result or {}).get("state", "") + log.info(f"Kamera startCapture: state={state}") + return web.json_response({"result": "ok", "state": state}) + + async def handle_api_camera_stop(self, request): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: self.client.publish( + "video", "stopCapture", None, timeout=0 + )) + return web.json_response({"result": "ok"}) + + async def handle_camera_stream(self, request): + """MJPEG proxy: FLV → MJPEG via ffmpeg, served as multipart/x-mixed-replace.""" + url = self._state.get("camera_url", "") + if not url: + return web.Response(status=503, text="Keine Kamera-URL bekannt") + + boundary = "kobraxframe" + resp = web.StreamResponse(headers={ + "Content-Type": f"multipart/x-mixed-replace;boundary={boundary}", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }) + await resp.prepare(request) + + proc = await asyncio.create_subprocess_exec( + "ffmpeg", "-loglevel", "quiet", + "-i", url, + "-vf", "fps=10,scale=640:-1", + "-f", "image2pipe", + "-vcodec", "mjpeg", + "-q:v", "5", + "pipe:1", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + + buf = b"" + try: + while True: + chunk = await proc.stdout.read(65536) + if not chunk: + break + buf += chunk + # Extract complete JPEG frames (SOI=FFD8, EOI=FFD9) + while True: + start = buf.find(b"\xff\xd8") + if start == -1: + buf = b"" + break + end = buf.find(b"\xff\xd9", start + 2) + if end == -1: + buf = buf[start:] + break + frame = buf[start:end + 2] + buf = buf[end + 2:] + header = ( + f"--{boundary}\r\n" + f"Content-Type: image/jpeg\r\n" + f"Content-Length: {len(frame)}\r\n\r\n" + ).encode() + try: + await resp.write(header + frame + b"\r\n") + except Exception: + return resp + except Exception as e: + log.warning(f"Kamera-Stream unterbrochen: {e}") + finally: + try: + proc.kill() + except Exception: + pass + + return resp + + async def handle_serve_file(self, request): + """Liefert hochgeladene G-Code-Dateien vom Temp-Verzeichnis (für Drucker-Download).""" + filename = os.path.basename(request.match_info.get("filename", "")) + serve_path = os.path.join(self._serve_dir_path, filename) + if not os.path.isfile(serve_path): + return web.Response(status=404, text="not found") + size = os.path.getsize(serve_path) + log.info(f"Drucker lädt Datei ab: {filename} ({size} bytes)") + return web.FileResponse(serve_path, headers={ + "Content-Disposition": f'attachment; filename="{filename}"' + }) + + async def handle_api_state(self, request): + s = self._state + return web.json_response({ + "printer_name": s["printer_name"], + "firmware_version": s["firmware_version"], + "print_state": s["print_state"], + "nozzle_temp": s["nozzle_temp"], + "nozzle_target": s["nozzle_target"], + "bed_temp": s["bed_temp"], + "bed_target": s["bed_target"], + "progress": s["progress"], + "print_duration": s["print_duration"], + "curr_layer": s["curr_layer"], + "total_layers": s["total_layers"], + "filename": s["filename"], + "camera_url": s["camera_url"], + "fan_speed": s["fan_speed"], + "light_on": s["light_on"], + "light_brightness": s["light_brightness"], + "ams_slots": self._ams_slots, + "ams_loaded_slot": self._ams_loaded_slot, + "thumbnail": self._thumbnail_b64, + }) + + async def handle_moonraker_database(self, request): + """OrcaSlicer 'Synchronize filament list from AMS' liest /server/database/item?namespace=lane_data""" + namespace = request.rel_url.query.get("namespace", "") + if namespace != "lane_data": + return web.json_response({"result": {"namespace": namespace, "value": {}}}) + loop = asyncio.get_event_loop() + slots = await loop.run_in_executor(None, lambda: self._get_ams_slots_fresh()) + lane_data = {} + for i, slot in enumerate(slots): + rgb = slot.get("color", [128, 128, 128]) + if isinstance(rgb, list) and len(rgb) >= 3: + alpha = rgb[3] if len(rgb) == 4 else 255 + color_hex = f"{rgb[0]:02X}{rgb[1]:02X}{rgb[2]:02X}{alpha:02X}" + else: + color_hex = "808080FF" + material = slot.get("type", "") + default_temps = { + "PLA": {"nozzle": 220, "bed": 60}, + "PETG": {"nozzle": 240, "bed": 70}, + "ABS": {"nozzle": 250, "bed": 100}, + "TPU": {"nozzle": 230, "bed": 40}, + } + temps = default_temps.get(material.upper(), {"nozzle": 220, "bed": 60}) + lane_data[f"lane{i}"] = { + "vendor_name": "Anycubic", + "name": material, + "color": color_hex, + "material": material, + "bed_temp": temps["bed"], + "nozzle_temp": temps["nozzle"], + "scan_time": None, + "td": None, + "lane": str(i), + "spool_id": None, + "filament_id": None, + } + log.info(f"AMS-Sync: {len(lane_data)} Slots an OrcaSlicer") + return web.json_response({"result": {"namespace": "lane_data", "value": lane_data}}) + + def _get_ams_slots_fresh(self): + """Frische Slot-Daten per getInfo holen, Fallback auf gecachte.""" + resp = self.client.publish("multiColorBox", "getInfo", None, timeout=5) + if resp and resp.get("data"): + boxes = resp["data"].get("multi_color_box") or [] + if boxes: + slots = boxes[0].get("slots") or [] + if slots: + self._ams_slots = slots + return self._ams_slots + + async def handle_catchall(self, request): + body = await request.read() + log.warning(f"UNBEKANNT {request.method} {request.path_qs} body={body[:200]}") + return web.json_response({"result": {}}, status=200) + + async def handle_favicon(self, request): + # Minimales 1x1 ICO damit der Browser nicht 404 loggt + ico = bytes([ + 0,0,1,0,1,0,1,1,0,0,1,0,24,0,40,0,0,0,22,0,0,0,40,0,0,0, + 1,0,0,0,2,0,0,0,1,0,24,0,0,0,0,0,4,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,255,102,0,0,0,0,0,0 + ]) + return web.Response(body=ico, content_type="image/x-icon") + + # ------------------------------------------------------------------------- + # WebSocket handler + # ------------------------------------------------------------------------- + + async def handle_websocket(self, request): + ws = web.WebSocketResponse(heartbeat=30) + await ws.prepare(request) + ws._loop = asyncio.get_event_loop() + self.ws_clients.add(ws) + log.info(f"WS client verbunden ({len(self.ws_clients)} gesamt)") + + # Send klippy_ready notification + await ws.send_str(json.dumps({ + "jsonrpc": "2.0", + "method": "notify_klippy_ready", + "params": [], + })) + # Send initial status + await ws.send_str(json.dumps({ + "jsonrpc": "2.0", + "method": "notify_status_update", + "params": [self._build_printer_objects(), time.time()], + })) + + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + await self._handle_ws_rpc(ws, msg.data) + elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE): + break + + self.ws_clients.discard(ws) + log.info(f"WS client getrennt ({len(self.ws_clients)} verbleibend)") + return ws + + async def _handle_ws_rpc(self, ws: web.WebSocketResponse, raw: str): + try: + req = json.loads(raw) + except Exception: + return + rpc_id = req.get("id") + method = req.get("method", "") + log.info(f"WS RPC: {method} params={str(req.get('params',''))[:120]}") + params = req.get("params") or {} + if isinstance(params, list): + params = params[0] if params else {} + + result = None + error = None + + try: + if method in ("printer.info", "printer_info"): + result = { + "state": "ready", + "state_message": "Printer is ready", + "hostname": "kobrax-bridge", + "software_version": KLIPPER_VERSION, + "cpu_info": self._state["printer_name"], + "klipper_path": "/home/pi/klipper", + "python_path": "/home/pi/klippy-env/bin/python", + } + elif method in ("server.info", "server_info"): + result = { + "klippy_connected": True, + "klippy_state": "ready", + "moonraker_version": MOONRAKER_VERSION, + "components": [], + "failed_components": [], + "registered_directories": ["gcodes"], + "warnings": [], + } + elif method in ("printer.objects.list",): + result = {"objects": list(self._build_printer_objects().keys())} + elif method in ("printer.objects.query", "printer.objects.get"): + objects = params.get("objects", {}) + all_objs = self._build_printer_objects() + if objects: + filtered = {k: all_objs.get(k, {}) for k in objects} + else: + filtered = all_objs + result = {"status": filtered, "eventtime": time.time()} + elif method == "printer.objects.subscribe": + objects = params.get("objects", {}) + all_objs = self._build_printer_objects() + if objects: + filtered = {k: all_objs.get(k, {}) for k in objects} + else: + filtered = all_objs + result = {"status": filtered, "eventtime": time.time()} + elif method == "printer.print.start": + filename = params.get("filename", self._last_uploaded_file) + loop = asyncio.get_event_loop() + resp = await loop.run_in_executor( + None, lambda: self.client.publish("print", "start", + {"filename": filename, "use_ams": False}, timeout=15.0) + ) + result = "ok" if resp else "timeout" + elif method == "printer.print.pause": + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.client.pause_print) + result = "ok" + elif method == "printer.print.resume": + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.client.resume_print) + result = "ok" + elif method == "printer.print.cancel": + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.client.stop_print) + result = "ok" + elif method == "machine.system_info": + result = {"system_info": {"cpu_info": {"cpu_desc": "Kobra X Bridge"}}} + elif method == "server.files.list": + result = [] + else: + log.debug(f"Unbekannte RPC-Methode: {method}") + result = {} + except Exception as e: + log.error(f"RPC-Fehler für {method}: {e}") + error = {"code": -32603, "message": str(e)} + + if rpc_id is not None: + response = {"jsonrpc": "2.0", "id": rpc_id} + if error: + response["error"] = error + else: + response["result"] = result + await ws.send_str(json.dumps(response)) + + # ------------------------------------------------------------------------- + # Poll loop (sync, runs in executor) + # ------------------------------------------------------------------------- + + def _poll_loop(self, stop_event: threading.Event): + while not stop_event.is_set(): + try: + info = self.client.query_info() + if info: + self._on_info(info) + # Während Druck: print/report direkt abfragen + if self._state["print_state"] in ("printing", "preheating", + "auto_leveling", "checking", "init"): + print_r = self.client.publish("print", "query", timeout=3.0) + if print_r: + self._on_print(print_r) + box = self.client.query_multicolor_box() + if box: + boxes = (box.get("data") or {}).get("multi_color_box") or [] + slots = boxes[0].get("slots") or [] if boxes else [] + if slots: + self._ams_slots = slots + except Exception as e: + log.warning(f"Poll-Fehler: {e}") + stop_event.wait(3.0) + + +# --------------------------------------------------------------------------- +# App factory + main +# --------------------------------------------------------------------------- + +def build_app(bridge: KobraXBridge) -> web.Application: + app = web.Application() + r = app.router + + # Moonraker API + r.add_get("/server/info", bridge.handle_server_info) + r.add_get("/printer/info", bridge.handle_printer_info) + r.add_get("/machine/system_info", bridge.handle_machine_system_info) + r.add_get("/printer/objects/list", bridge.handle_objects_list) + r.add_get("/printer/objects/query", bridge.handle_objects_query) + r.add_get("/printer/objects/subscribe", bridge.handle_objects_subscribe) + r.add_post("/printer/objects/subscribe", bridge.handle_objects_subscribe) + r.add_get("/server/files/list", bridge.handle_files_list) + r.add_post("/server/files/upload", bridge.handle_file_upload) + r.add_post("/printer/print/start", bridge.handle_print_start) + r.add_post("/printer/print/pause", bridge.handle_print_pause) + r.add_post("/printer/print/resume", bridge.handle_print_resume) + r.add_post("/printer/print/cancel", bridge.handle_print_cancel) + + # OctoPrint compatibility (OrcaSlicer probes this + uploads here) + r.add_get("/api/version", bridge.handle_octoprint_version) + r.add_post("/api/files/local", bridge.handle_file_upload) + r.add_post("/api/files/{path:.*}", bridge.handle_file_upload) + + # Moonraker database (OrcaSlicer AMS-Sync) + r.add_get("/server/database/item", bridge.handle_moonraker_database) + + # New API endpoints + r.add_post("/api/light", bridge.handle_api_light) + r.add_post("/api/fan", bridge.handle_api_fan) + r.add_post("/api/ams/feed", bridge.handle_api_ams_feed) + r.add_post("/api/axis", bridge.handle_api_axis) + r.add_post("/api/temperature", bridge.handle_api_temperature) + r.add_get("/api/camera", bridge.handle_api_camera) + r.add_get("/api/camera/stream", bridge.handle_camera_stream) + r.add_post("/api/camera/start", bridge.handle_api_camera_start) + r.add_post("/api/camera/stop", bridge.handle_api_camera_stop) + r.add_get("/api/state", bridge.handle_api_state) + r.add_get("/serve/{filename}", bridge.handle_serve_file) + + # Root + favicon (OrcaSlicer öffnet / in eingebettetem Browser) + r.add_get("/", bridge.handle_index) + r.add_get("/favicon.ico", bridge.handle_favicon) + + # WebSocket + r.add_get("/websocket", bridge.handle_websocket) + + # Catch-all: alle unbekannten Requests loggen statt 404 + r.add_route("*", "/{path:.*}", bridge.handle_catchall) + + return app + + +async def run_bridge(args): + log.info(f"Verbinde mit Drucker {args.printer_ip}:{args.mqtt_port} …") + client = KobraXClient( + host=args.printer_ip, + port=args.mqtt_port, + username=args.username, + password=args.password, + mode_id=args.mode_id, + device_id=args.device_id, + client_id="kobrax_bridge", + ) + + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, client.connect) + log.info("MQTT verbunden") + + bridge = KobraXBridge(client) + app = build_app(bridge) + + stop_event = threading.Event() + poll_thread = threading.Thread( + target=bridge._poll_loop, args=(stop_event,), daemon=True, name="poll" + ) + poll_thread.start() + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, args.host, args.port) + await site.start() + + log.info(f"Bridge läuft auf http://{args.host}:{args.port}") + log.info(f"OrcaSlicer → Klipper → Host: {args.host} Port: {args.port}") + log.info("Ctrl-C zum Beenden") + + try: + while True: + await asyncio.sleep(3600) + except (KeyboardInterrupt, asyncio.CancelledError): + pass + finally: + stop_event.set() + await runner.cleanup() + client.disconnect() + log.info("Bridge beendet") + + +def main(): + parser = argparse.ArgumentParser(description="Moonraker-Bridge für Anycubic Kobra X") + parser.add_argument("--printer-ip", default=env_loader.PRINTER_IP, + help="IP-Adresse des Druckers") + parser.add_argument("--mqtt-port", type=int, default=env_loader.MQTT_PORT) + parser.add_argument("--username", default=env_loader.USERNAME) + parser.add_argument("--password", default=env_loader.PASSWORD) + parser.add_argument("--mode-id", default=env_loader.MODE_ID) + parser.add_argument("--device-id", default=env_loader.DEVICE_ID) + parser.add_argument("--host", default="0.0.0.0", + help="Bind-Adresse für den Bridge-Server") + parser.add_argument("--port", type=int, default=7125, + help="HTTP/WS-Port (Moonraker-Standard: 7125)") + args = parser.parse_args() + + asyncio.run(run_bridge(args)) + + +if __name__ == "__main__": + main() diff --git a/releases/0.9.0-beta1/SHA256SUMS.txt b/releases/0.9.0-beta1/SHA256SUMS.txt new file mode 100644 index 0000000..b6818cd --- /dev/null +++ b/releases/0.9.0-beta1/SHA256SUMS.txt @@ -0,0 +1,3 @@ +fb4bf06b0cfb5bcac81e2faf99d8ace1c15771ea009837802a08a4dd5ba77a8f /home/coding/Source/kobrax/releases/0.9.0-beta1/extract_credentials +68f9bf800d1df0e71423edd35e90a8f5f7fb6e9e5220a8c12ed98cc6c4fb4833 /home/coding/Source/kobrax/releases/0.9.0-beta1/extract_credentials.exe +7c1a99953e21fc3881f60df444940d66a4689b009e9a17ec936396857a6b9dc0 /home/coding/Source/kobrax/releases/0.9.0-beta1/kx-bridge diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6a37f7b --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +aiohttp>=3.9