""" kobrax_moonraker_bridge.py – Moonraker-kompatibler HTTP/WebSocket-Bridge für Anycubic Kobra X Emuliert die Moonraker/Klipper-API damit OrcaSlicer den Kobra X direkt ansteuern kann. Verwendung: python kobrax_moonraker_bridge.py --printer-ip 192.168.178.94 OrcaSlicer-Konfiguration: Drucker-Typ: Klipper | Host: 127.0.0.1 | Port: 7125 """ import argparse import sqlite3 import uuid try: import config_loader as env_loader except ImportError: import env_loader import asyncio import hashlib import json import logging import os import pathlib import re import subprocess import sys import tempfile import time import threading # Bei PyInstaller-Binary liegt alles neben sys.executable, sonst neben __file__ _BASE = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, _BASE) from kobrax_client import KobraXClient try: import imageio_ffmpeg def _find_ffmpeg() -> str: return imageio_ffmpeg.get_ffmpeg_exe() except ImportError: def _find_ffmpeg() -> str: exe_name = "ffmpeg.exe" if sys.platform == "win32" else "ffmpeg" local = os.path.join(_BASE, exe_name) if os.path.isfile(local): return local return "ffmpeg" try: from aiohttp import web import aiohttp except ImportError: print("Fehler: aiohttp nicht installiert. Bitte: pip install aiohttp") sys.exit(1) try: import base64 as _base64 from Crypto.Cipher import AES as _AES from Crypto.Util.Padding import unpad as _unpad _HAS_CRYPTO = True except ImportError: _HAS_CRYPTO = False def _kx_generate_signature(token: str, ts: int, nonce: str) -> str: first = hashlib.md5(token[:16].encode()).hexdigest() return hashlib.md5((first + str(ts) + nonce).encode()).hexdigest() def _kx_decrypt_info(encrypted_b64: str, key: str, iv: str) -> dict: cipher = _AES.new(key.encode(), _AES.MODE_CBC, iv.encode()) raw = _base64.b64decode(encrypted_b64) return json.loads(_unpad(cipher.decrypt(raw), _AES.block_size).decode()) async def _kx_fetch_credentials(ip: str, port: int = 18910) -> dict: """Holt + entschlüsselt Drucker-Credentials via HTTP /info + /ctrl. Wirft eine Exception bei Netzwerk-/Decrypt-Fehlern. Algorithmus aus tools/fetch_credentials.py (AES-256-CBC, Key=token[16:32], IV=ctrl-token). """ if not _HAS_CRYPTO: raise RuntimeError("pycryptodome nicht installiert") import random, string nonce = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(6)) timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession() as s: async with s.get(f"http://{ip}:{port}/info", timeout=timeout) as r: r.raise_for_status() info = await r.json() token = info["token"] ts = int(time.time() * 1000) sign = _kx_generate_signature(token, ts, nonce) params = {"ts": ts, "nonce": nonce, "sign": sign, "did": "random"} async with s.post(f"http://{ip}:{port}/ctrl", params=params, timeout=timeout) as r: r.raise_for_status() data = await r.json() result = _kx_decrypt_info(data["data"]["info"], token[16:32], data["data"]["token"]) if "error" in result: raise RuntimeError(result.get("error", "decrypt failed")) return { "printer_ip": result.get("ip", ip), "username": result.get("username", ""), "password": result.get("password", ""), "device_id": result.get("deviceId", ""), "mode_id": str(result.get("modeId", "20030")), "model": result.get("modelName", "Anycubic Kobra"), } logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)-5s %(name)s: %(message)s", datefmt="%H:%M:%S") log = logging.getLogger("bridge") # Ring-Buffer für Browser-Log-Stream (letzte 200 Einträge) import collections as _collections _log_buffer: "_collections.deque[dict]" = _collections.deque(maxlen=500) _log_sse_queues: "list[asyncio.Queue]" = [] class _BrowserLogHandler(logging.Handler): """Sendet Log-Records in den Ring-Buffer und alle offenen SSE-Queues.""" _fmt = logging.Formatter(datefmt="%H:%M:%S") def emit(self, record: logging.LogRecord): entry = { "ts": self._fmt.formatTime(record, "%H:%M:%S"), "lvl": record.levelname, "name": record.name, "msg": record.getMessage(), } _log_buffer.append(entry) for q in list(_log_sse_queues): try: q.put_nowait(entry) except Exception: pass _browser_handler = _BrowserLogHandler() logging.getLogger().addHandler(_browser_handler) KOBRA_TO_KLIPPER_STATE = { "free": "standby", "busy": "printing", "printing": "printing", "preheating": "printing", "auto_leveling": "printing", "checking": "printing", "updated": "printing", "init": "printing", "pausing": "paused", "paused": "paused", "resuming": "printing", "resumed": "printing", "stopping": "printing", "stoped": "standby", "finished": "complete", "failed": "error", "canceled": "standby", } MOONRAKER_VERSION = "v0.9.3-1" KLIPPER_VERSION = "v0.12.0-1" def _parse_gcode_estimated_time(data: bytes) -> int: """Liest geschätzte Druckzeit aus GCode (OrcaSlicer + PrusaSlicer). Gibt Sekunden zurück, 0 wenn nicht gefunden. PrusaSlicer schreibt die Zeit ins Header (erste 16KB), OrcaSlicer schreibt sie ans Ende der Datei (letzte 16KB).""" import re # Anfang + Ende der Datei durchsuchen (OrcaSlicer schreibt Zeit am Ende) search_text = (data[:16384] + data[-65536:]).decode("utf-8", errors="ignore") # OrcaSlicer: ; total estimated time: 9m 20s # PrusaSlicer: ; estimated printing time (normal mode) = 1h 9m 20s m = (re.search(r";\s*total estimated time:\s*(.*)", search_text) or re.search(r";\s*estimated printing time \(normal mode\)\s*=\s*(.*)", search_text)) if not m: return 0 parts = re.findall(r"(\d+)\s*([hms])", m.group(1)) secs = 0 for val, unit in parts: if unit == "h": secs += int(val) * 3600 elif unit == "m": secs += int(val) * 60 elif unit == "s": secs += int(val) if secs: log.info(f"Slicer-Schätzzeit: {secs}s ({m.group(1).strip()})") return secs def _extract_thumbnail(data: bytes) -> str: """Extrahiert Base64-PNG-Thumbnail aus GCode (OrcaSlicer-Format).""" try: marker = b"; thumbnail begin" end_marker = b"; thumbnail end" start = data.find(marker) if start == -1: return "" start = data.find(b"\n", start) + 1 end = data.find(end_marker, start) if end == -1: return "" lines = data[start:end].split(b"\n") b64 = b"".join( line[2:].strip() if line.startswith(b"; ") else line.strip() for line in lines ) return b64.decode("ascii") except Exception: return "" def _extract_filament_info(data: bytes) -> list[dict]: """Liest filament_colour + filament_type aus GCode-Header (OrcaSlicer/PrusaSlicer). Gibt Liste von {color_hex, material} pro Slot zurück, leer wenn nicht gefunden. Liest nur die ersten 8KB (Header-Bereich). """ try: header = data[:8192].decode("utf-8", errors="ignore") colors, materials = [], [] for line in header.splitlines(): if line.startswith("; filament_colour"): val = line.split("=", 1)[-1].strip() colors = [c.strip().lstrip("#") for c in val.split(";") if c.strip()] elif line.startswith("; filament_type"): val = line.split("=", 1)[-1].strip() materials = [m.strip() for m in val.split(";") if m.strip()] if not colors: return [] result = [] for i, hex_color in enumerate(colors): result.append({ "slot_index": i, "color_hex": "#" + hex_color.upper() if hex_color else "#FFFFFF", "material": materials[i] if i < len(materials) else "PLA", }) return result except Exception: return [] class GCodeStore: """Persistenter GCode-Store pro Bridge-Instanz (SQLite).""" def __init__(self, data_dir: str): os.makedirs(data_dir, exist_ok=True) self._gcode_dir = os.path.join(data_dir, "gcodes") os.makedirs(self._gcode_dir, exist_ok=True) db_path = os.path.join(data_dir, "kx-bridge.db") self._conn = sqlite3.connect(db_path, check_same_thread=False) self._conn.row_factory = sqlite3.Row self._lock = threading.Lock() self._init_schema() def _init_schema(self): with self._lock: self._conn.executescript(""" CREATE TABLE IF NOT EXISTS gcode_files ( id TEXT PRIMARY KEY, filename TEXT NOT NULL, path TEXT NOT NULL, size_bytes INTEGER NOT NULL, uploaded_at TEXT NOT NULL, thumbnail_b64 TEXT, est_print_time_sec INTEGER, filament_used_mm REAL, layer_count INTEGER, gcode_filaments TEXT, objects_skip_parts TEXT, svg_image TEXT ); CREATE TABLE IF NOT EXISTS print_jobs ( id TEXT PRIMARY KEY, gcode_file_id TEXT NOT NULL, printer_id TEXT NOT NULL, started_at TEXT NOT NULL, ended_at TEXT, status TEXT NOT NULL, duration_sec INTEGER, filament_assignments TEXT, abort_reason TEXT ); """) # Migration: Spalte gcode_filaments nachrüsten falls DB älter try: self._conn.execute("ALTER TABLE gcode_files ADD COLUMN gcode_filaments TEXT") self._conn.commit() except Exception: pass # Migration: Spalten objects_skip_parts + svg_image (Part-Skip-Feature, v0.9.10) for col, typ in (("objects_skip_parts", "TEXT"), ("svg_image", "TEXT")): try: self._conn.execute(f"ALTER TABLE gcode_files ADD COLUMN {col} {typ}") self._conn.commit() except Exception: pass def save_file(self, file_id: str, filename: str, data: bytes, est_time_sec: int = 0, thumbnail_b64: str = "", gcode_filaments: list | None = None) -> str: """Speichert GCode-Datei auf Disk und in DB. Gibt Pfad zurück.""" safe_name = os.path.basename(filename) path = os.path.join(self._gcode_dir, safe_name) with open(path, "wb") as f: f.write(data) now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) with self._lock: filaments_json = json.dumps(gcode_filaments) if gcode_filaments else None self._conn.execute( """INSERT OR REPLACE INTO gcode_files (id, filename, path, size_bytes, uploaded_at, thumbnail_b64, est_print_time_sec, gcode_filaments) VALUES (?,?,?,?,?,?,?,?)""", (file_id, filename, path, len(data), now, thumbnail_b64 or None, est_time_sec or None, filaments_json) ) self._conn.commit() return path def list_files(self) -> list: with self._lock: rows = self._conn.execute( "SELECT * FROM gcode_files ORDER BY uploaded_at DESC" ).fetchall() return [dict(r) for r in rows] def get_file(self, file_id: str) -> dict | None: with self._lock: row = self._conn.execute( "SELECT * FROM gcode_files WHERE id=?", (file_id,) ).fetchone() return dict(row) if row else None def get_file_by_name(self, filename: str) -> dict | None: with self._lock: row = self._conn.execute( "SELECT * FROM gcode_files WHERE filename=? ORDER BY uploaded_at DESC LIMIT 1", (filename,) ).fetchone() return dict(row) if row else None def update_file_objects(self, filename: str, objects: list, svg: str = "") -> None: """Speichert Objekt-Liste + optionales SVG zu einer Datei (matcht via filename).""" if not filename: return with self._lock: self._conn.execute( "UPDATE gcode_files SET objects_skip_parts=?, svg_image=? " "WHERE filename=?", (json.dumps(objects), svg or "", filename), ) self._conn.commit() def delete_file(self, file_id: str) -> bool: row = self.get_file(file_id) if not row: return False try: os.remove(row["path"]) except OSError: pass with self._lock: self._conn.execute("DELETE FROM gcode_files WHERE id=?", (file_id,)) self._conn.commit() return True def start_job(self, gcode_file_id: str, printer_id: str, filament_assignments: list | None = None) -> str: job_id = str(uuid.uuid4()) now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) assignments_json = json.dumps(filament_assignments) if filament_assignments else None with self._lock: self._conn.execute( """INSERT INTO print_jobs (id, gcode_file_id, printer_id, started_at, status, filament_assignments) VALUES (?,?,?,?,'printing',?)""", (job_id, gcode_file_id, printer_id, now, assignments_json) ) self._conn.commit() return job_id def finish_job(self, job_id: str, status: str = "completed", abort_reason: str = "") -> None: now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) with self._lock: row = self._conn.execute( "SELECT started_at FROM print_jobs WHERE id=?", (job_id,) ).fetchone() duration = None if row: try: import calendar start = time.strptime(row["started_at"], "%Y-%m-%dT%H:%M:%SZ") duration = int(time.time() - calendar.timegm(start)) except Exception: pass self._conn.execute( """UPDATE print_jobs SET ended_at=?, status=?, duration_sec=?, abort_reason=? WHERE id=?""", (now, status, duration, abort_reason or None, job_id) ) self._conn.commit() def list_jobs(self, limit: int = 50, offset: int = 0) -> list: with self._lock: rows = self._conn.execute( """SELECT j.*, f.filename, f.thumbnail_b64 FROM print_jobs j LEFT JOIN gcode_files f ON j.gcode_file_id = f.id ORDER BY j.started_at DESC LIMIT ? OFFSET ?""", (limit, offset) ).fetchall() return [dict(r) for r in rows] class KobraXBridge: def __init__(self, client: KobraXClient, args=None, store=None, printer_id: str = "1", all_bridges=None): self.client = client self._args = args self._printer_id = printer_id self._all_bridges = all_bridges if all_bridges is not None else {} self.ws_clients: set[web.WebSocketResponse] = set() self._last_state: dict = {} self._state = { "nozzle_temp": 0.0, "nozzle_target": 0.0, "bed_temp": 0.0, "bed_target": 0.0, "print_state": "standby", "kobra_state": "free", "filename": "", "slicer_time": 0, "progress": 0.0, "print_duration": 0, "remain_time": 0, "curr_layer": 0, "total_layers": 0, "printer_name": env_loader.get("BRIDGE_PRINTER_NAME", "Anycubic Kobra X"), "firmware_version": "unknown", "upload_url": "", "camera_url": "", "fan_speed": 0, "light_on": False, "light_brightness": 80, "taskid": "-1", "print_speed_mode": 2, "connection_error": "", "file_ready": "", } self._ams_slots: list[dict] = [] self._ams_loaded_slot: int = -1 self._last_uploaded_file: str = "" self._store = store if store is not None else GCodeStore(args.data_dir) self._serve_dir_path: str = self._store._gcode_dir self._current_job_id: str = "" self._thumbnail_b64: str = "" # Part-Skip: zuletzt vom Drucker gemeldete Skip-Liste (v0.9.10) self._skip_state: dict = {"objects": [], "skipped": [], "ts": 0} # Register MQTT push callbacks client.callbacks["tempature/report"] = self._on_temp client.callbacks["print/report"] = self._on_print client.callbacks["info/report"] = self._on_info client.callbacks["file/report"] = self._on_file client.callbacks["multiColorBox/report"] = self._on_multicolor_box client.callbacks["light/report"] = self._on_light client.callbacks["skip/report"] = self._on_skip # ------------------------------------------------------------------------- # MQTT callbacks (called from reader thread) # ------------------------------------------------------------------------- def _on_temp(self, payload: dict): d = payload.get("data") or {} self._state["nozzle_temp"] = float(d.get("curr_nozzle_temp", 0)) self._state["nozzle_target"] = float(d.get("target_nozzle_temp", 0)) self._state["bed_temp"] = float(d.get("curr_hotbed_temp", 0)) self._state["bed_target"] = float(d.get("target_hotbed_temp", 0)) self._push_status_update() def _on_print(self, payload: dict): d = payload.get("data") or {} kobra_state = payload.get("state", "") self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "printing") if kobra_state: self._state["kobra_state"] = kobra_state # Job-History: Druckstart erkennen if kobra_state == "printing" and not self._current_job_id: filename = d.get("filename", self._state.get("filename", "")) if filename: gf = self._store.get_file_by_name(filename) if gf: self._current_job_id = self._store.start_job( gcode_file_id=gf["id"], printer_id=self._printer_id, ) log.info(f"Job gestartet: {self._current_job_id} für {filename}") # Job-History: Druckende erkennen if kobra_state in ("finished",) and self._current_job_id: self._store.finish_job(self._current_job_id, status="completed") log.info(f"Job abgeschlossen: {self._current_job_id}") self._current_job_id = "" elif kobra_state in ("stoped", "canceled") and self._current_job_id: self._store.finish_job(self._current_job_id, status="cancelled") log.info(f"Job abgebrochen: {self._current_job_id}") self._current_job_id = "" if kobra_state in ("stoped", "canceled"): self._state["progress"] = 0.0 self._state["filename"] = "" self._state["file_ready"] = "" self._state["print_duration"] = 0 self._state["remain_time"] = 0 self._state["slicer_time"] = 0 self._thumbnail_b64 = "" self._state["filename"] = d.get("filename", self._state["filename"]) if "progress" in d: self._state["progress"] = float(d["progress"]) / 100.0 if "print_time" in d: self._state["print_duration"] = int(d["print_time"]) * 60 if "remain_time" in d: self._state["remain_time"] = int(d["remain_time"]) * 60 if "curr_layer" in d: self._state["curr_layer"] = d["curr_layer"] if "total_layers" in d: self._state["total_layers"] = d["total_layers"] if "taskid" in d: self._state["taskid"] = str(d["taskid"]) settings = d.get("settings") or {} if "print_speed_mode" in settings: self._state["print_speed_mode"] = int(settings["print_speed_mode"]) self._push_status_update() def _on_info(self, payload: dict): d = payload.get("data") or {} # MQTT-Name nur übernehmen wenn kein eigener Name gesetzt (env oder per-Drucker config) if not env_loader.get("BRIDGE_PRINTER_NAME") and not getattr(self, "_name_locked", False): self._state["printer_name"] = d.get("printerName", self._state["printer_name"]) self._state["firmware_version"] = d.get("version", self._state["firmware_version"]) kobra_state = d.get("state", "") if kobra_state: self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "standby") self._state["kobra_state"] = kobra_state t = d.get("temp") or {} if t: self._state["nozzle_temp"] = float(t.get("curr_nozzle_temp", 0)) self._state["nozzle_target"] = float(t.get("target_nozzle_temp", 0)) self._state["bed_temp"] = float(t.get("curr_hotbed_temp", 0)) self._state["bed_target"] = float(t.get("target_hotbed_temp", 0)) urls = d.get("urls") or {} if urls.get("fileUploadurl"): self._state["upload_url"] = urls["fileUploadurl"] if urls.get("rtspUrl"): self._state["camera_url"] = urls["rtspUrl"] fan = d.get("fan_speed_pct") if fan is not None: self._state["fan_speed"] = int(fan) speed_mode = d.get("print_speed_mode") if speed_mode is not None: self._state["print_speed_mode"] = int(speed_mode) self._push_status_update() def _on_skip(self, payload: dict): """skip/report-Callback (Part-Skip-Feature, v0.9.10). Drucker meldet hier IMMER die Liste der bereits geskippten Objekte zurück (objects_skip_parts), egal ob auf query_obj oder nach skip/start. Die Gesamt-Objektliste kommt aus file/report. """ d = payload.get("data") or {} skipped = d.get("objects_skip_parts") or d.get("skipped") or d.get("skipped_parts") or [] # Liste immer (auch leer) übernehmen – sonst bleibt sie auf alten Stand self._skip_state = { "skipped": list(skipped), "ts": int(time.time()), } if payload.get("state") == "done" or payload.get("code") == 200: log.info(f"Skip-Antwort: state={payload.get('state')} code={payload.get('code')} skipped={skipped}") def _on_file(self, payload: dict): d = payload.get("data") or {} details = d.get("file_details") or {} thumb = details.get("thumbnail") or details.get("png_image") or "" if thumb: self._thumbnail_b64 = thumb log.info(f"Vorschaubild empfangen: {len(thumb)} Zeichen base64") # Part-Skip: Objekt-Liste + optionales SVG (v0.9.10) objs = details.get("objects_skip_parts") or [] svg = details.get("svg_image") or "" if objs: filename = d.get("filename") or details.get("filename") or self._last_uploaded_file if filename: try: self._store.update_file_objects(filename, objs, svg) log.info(f"Skip-Objekte für {filename}: {len(objs)} ({'mit SVG' if svg else 'ohne SVG'})") except Exception as e: log.warning(f"update_file_objects fehlgeschlagen: {e}") self._push_status_update() def _on_multicolor_box(self, payload: dict): boxes = (payload.get("data") or {}).get("multi_color_box") or [] if not boxes: return box = boxes[0] slots = box.get("slots") or [] loaded = box.get("loaded_slot", -1) if loaded != -1: self._ams_loaded_slot = loaded # Tip-Forming: nach Einziehen (status=10) oder Ausziehen (status=11) # schickt der originale Slicer automatisch type=3 (Extruder-Rückzug) fs = box.get("feed_status") or {} current_status = fs.get("current_status") slot_index = fs.get("slot_index", 0) if current_status in (10, 11): import threading def _tip_form(): import time; time.sleep(2) self.client.publish( "multiColorBox", "feedFilament", {"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": 3}}]}, timeout=0 ) log.info(f"Tip-Forming (type=3) nach status={current_status} slot={slot_index}") threading.Thread(target=_tip_form, daemon=True).start() if slots: self._ams_slots = slots log.info(f"AMS-Slots empfangen: {len(slots)}, loaded_slot={self._ams_loaded_slot}") self._push_status_update() def _on_light(self, payload: dict): d = payload.get("data") or {} self._state["light_on"] = bool(d.get("status", 0)) self._state["light_brightness"] = int(d.get("brightness", 80)) self._push_status_update() # OrcaSlicer filament preset IDs (MoonrakerPrinterAgent.cpp mapping) _TRAY_INFO_IDX = { "PLA": "OGFL99", "PLA-CF": "OGFL98", "PLA SILK": "OGFL96", "PETG": "OGFG99", "PETG-CF": "OGFG98", "ABS": "OGFB99", "ASA": "OGFB98", "TPU": "OGFT99", "PA": "OGFP99", "PA-CF": "OGFP98", "PC": "OGFC99", "HIPS": "OGFH99", "PVA": "OGFV99", } def _build_lane_data(self) -> dict: """Baut BBL-AMS-JSON für OrcaSlicer DevFilaSystemParser::ParseV1_0.""" slots = self._ams_slots total = len(slots) if total == 0: return {"ams": [], "ams_exist_bits": "0", "tray_exist_bits": "0"} ams_count = (total + 3) // 4 ams_exist_bits = 0 tray_exist_bits = 0 ams_array = [] for ams_id in range(ams_count): ams_exist_bits |= (1 << ams_id) tray_array = [] max_slot = min(3, total - ams_id * 4 - 1) for slot_id in range(max_slot + 1): slot_index = ams_id * 4 + slot_id slot = slots[slot_index] if slot_index < total else {} occupied = slot.get("status") == 5 if occupied: tray_exist_bits |= (1 << slot_index) color_raw = slot.get("color", [255, 255, 255]) if isinstance(color_raw, list) and len(color_raw) >= 3: color_hex = "{:02X}{:02X}{:02X}FF".format( int(color_raw[0]), int(color_raw[1]), int(color_raw[2]) ) elif isinstance(color_raw, str) and len(color_raw) >= 6: color_hex = color_raw[:6].upper() + "FF" else: color_hex = "FFFFFFFF" material = slot.get("type", "PLA").upper() tray_info_idx = self._TRAY_INFO_IDX.get(material, "OGFL99") tray_array.append({ "id": str(slot_id), "tag_uid": "0000000000000000", "tray_info_idx": tray_info_idx, "tray_type": material, "tray_color": color_hex, }) else: tray_array.append({ "id": str(slot_id), "tag_uid": "0000000000000000", "tray_info_idx": "", "tray_type": "", "tray_color": "00000000", "tray_slot_placeholder": "1", }) ams_array.append({"id": str(ams_id), "info": "0002", "tray": tray_array}) return { "ams": ams_array, "ams_exist_bits": format(ams_exist_bits, "X"), "tray_exist_bits": format(tray_exist_bits, "X"), } # ------------------------------------------------------------------------- # WebSocket push # ------------------------------------------------------------------------- def _push_status_update(self): if not self.ws_clients: return msg = { "jsonrpc": "2.0", "method": "notify_status_update", "params": [self._build_printer_objects(), time.time()], } text = json.dumps(msg) dead = set() for ws in self.ws_clients: try: asyncio.run_coroutine_threadsafe(ws.send_str(text), ws._loop) except Exception: dead.add(ws) self.ws_clients -= dead def _build_mmu_object(self) -> dict: slots = self._ams_slots if not slots: return {} _TEMP = {"PLA": 210, "PETG": 230, "ABS": 240, "ASA": 250, "TPU": 220, "PA": 260, "PC": 270, "HIPS": 220} num_gates = len(slots) gate_status, gate_material, gate_color, gate_temperature, gate_color_rgb = [], [], [], [], [] for slot in slots: occupied = slot.get("status") == 5 gate_status.append(1 if occupied else 0) material = slot.get("type", "PLA").upper() if occupied else "" gate_material.append(material) c = slot.get("color", [0, 0, 0]) if occupied: gate_color.append("#{:02X}{:02X}{:02X}".format(*c[:3])) gate_color_rgb.append([round(c[0]/255, 3), round(c[1]/255, 3), round(c[2]/255, 3)]) else: gate_color.append("") gate_color_rgb.append([0.0, 0.0, 0.0]) gate_temperature.append(_TEMP.get(material, 210) if occupied else 0) return { "num_gates": num_gates, "enabled": True, "gate_status": gate_status, "gate_material": gate_material, "gate_color": gate_color, "gate_temperature": gate_temperature, "gate_color_rgb": gate_color_rgb, "gate_filament_name": [""] * num_gates, "gate_spool_id": [-1] * num_gates, "ttg_map": list(range(num_gates)), "tool": max(self._ams_loaded_slot, 0), "gate": max(self._ams_loaded_slot, 0), } def _build_printer_objects(self) -> dict: s = self._state return { "extruder": { "temperature": s["nozzle_temp"], "target": s["nozzle_target"], "power": 0.0, }, "heater_bed": { "temperature": s["bed_temp"], "target": s["bed_target"], "power": 0.0, }, "print_stats": { "state": s["print_state"], "filename": s["filename"], "print_duration": s["print_duration"], "total_duration": s["print_duration"], "remain_time": s["remain_time"], "info": { "current_layer": s["curr_layer"], "total_layer": s["total_layers"], }, }, "display_status": { "progress": s["progress"], "message": "", }, "virtual_sdcard": { "progress": s["progress"], "is_active": s["print_state"] == "printing", "file_path": s["filename"], }, "toolhead": { "position": [0, 0, 0, 0], "homed_axes": "xyz", "print_time": s["print_duration"], "estimated_print_time": s["print_duration"], }, "mmu": self._build_mmu_object(), } # ------------------------------------------------------------------------- # /kx/ API handlers (GCode Store, History, Filament) # ------------------------------------------------------------------------- _CORS = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", } def _json_cors(self, data, status=200): return web.json_response(data, status=status, headers=self._CORS) async def handle_kx_options(self, request): return web.Response(status=204, headers=self._CORS) async def handle_kx_files(self, request): files = self._store.list_files() # Letzten Job-Status + Dauer pro Datei ergänzen jobs = self._store.list_jobs(limit=500) last_job: dict = {} for j in reversed(jobs): last_job[j["gcode_file_id"]] = j for f in files: lj = last_job.get(f["id"]) f["last_print_status"] = lj["status"] if lj else None f["last_print_duration"] = lj["duration_sec"] if lj else None f["last_print_at"] = lj["started_at"] if lj else None return self._json_cors({"result": files}) async def handle_kx_file_delete(self, request): file_id = request.match_info["file_id"] if self._store.delete_file(file_id): return self._json_cors({"result": "ok"}) return self._json_cors({"error": "not found"}, status=404) async def handle_kx_filament_slots(self, request): slots = [] for i, s in enumerate(self._ams_slots): slots.append({ "slot_index": i, "material": s.get("type", ""), "color_hex": "#{:02X}{:02X}{:02X}".format(*s.get("color", [0,0,0])[:3]), "status": "loaded" if s.get("status") == 5 else "empty", "nozzle_temp": 0, }) return self._json_cors({"result": slots}) async def handle_kx_history(self, request): limit = int(request.rel_url.query.get("limit", 50)) offset = int(request.rel_url.query.get("offset", 0)) jobs = self._store.list_jobs(limit=limit, offset=offset) return self._json_cors({"result": jobs}) async def handle_kx_file_objects(self, request): """Liefert die Objekt-Liste + optionales SVG für eine Datei. GET /kx/files/{id}/objects → {"names": [...], "svg_b64": "..."} Wenn Datei noch keine Objekte hat (alter Eintrag): file/fileDetails beim Drucker abfragen und Antwort abwarten ist Aufgabe des Frontends (Reload nach Upload). Hier nur Datenbankstand zurückgeben. """ fid = request.match_info.get("id", "") f = self._store.get_file(fid) if not f: return self._json_cors({"error": "file not found"}, status=404) try: names = json.loads(f.get("objects_skip_parts") or "[]") except Exception: names = [] return self._json_cors({ "result": { "names": names, "svg_b64": f.get("svg_image") or "", } }) async def handle_kx_skip(self, request): """Mid-Print Skip auslösen. POST /kx/skip body={"names": ["..", ".."]} """ try: body = await request.json() except Exception: return self._json_cors({"error": "invalid json"}, status=400) names = body.get("names") or [] if not isinstance(names, list) or not all(isinstance(n, str) for n in names): return self._json_cors({"error": "names must be list[str]"}, status=400) try: loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: self.client.skip_objects(names)) except Exception as e: return self._json_cors({"error": str(e)}, status=502) return self._json_cors({"result": "ok", "names": names}) async def handle_kx_skip_query(self, request): """Druck-Objektliste vom Drucker neu abfragen. POST /kx/skip/query → triggert skip/query_obj, gibt zuletzt bekannten Stand zurück (skip/report kommt async, Frontend pollt /kx/skip/state). """ try: loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: self.client.query_skip_objects()) except Exception as e: return self._json_cors({"error": str(e)}, status=502) return self._json_cors({"result": self._skip_state}) async def handle_kx_skip_state(self, request): """Aktueller Skip-State. Kombiniert: - Gesamt-Objektliste: aus dem GCode-Store, gematcht über den aktuell laufenden filename (file/report beim Druckstart hat die Liste gefüllt). skip/query_obj liefert nämlich NUR die bereits geskippten zurück, nicht die Gesamtliste. - Geskippt: aus self._skip_state (von skip/report aktualisiert). """ filename = self._state.get("filename", "") all_objects: list[str] = [] svg = "" if filename: try: f = self._store.get_file_by_name(filename) if f: all_objects = json.loads(f.get("objects_skip_parts") or "[]") svg = f.get("svg_image") or "" except Exception as e: log.warning(f"skip_state lookup failed: {e}") result = { "objects": all_objects, "skipped": list(self._skip_state.get("skipped", [])), "svg_b64": svg, "ts": self._skip_state.get("ts", 0), "filename": filename, } return self._json_cors({"result": result}) async def handle_kx_printers(self, request): # Aktive Drucker (mit IP) sammeln active = [(pid, br) for pid, br in self._all_bridges.items() if (br._args.printer_ip or "").strip()] # Host für bridge_url: Browser-Sicht beibehalten, aber niemals "localhost" exportieren – # sonst scheitern Fetches aus dem Browser, wenn die UI über die LAN-IP geöffnet ist. host = request.host.split(":")[0] if host in ("localhost", "127.0.0.1", "::1", "0.0.0.0"): host = "" out = [] for pid, br in active: port = getattr(br._args, "port", 7125) # Nur bei Multi-Printer eine konkrete bridge_url setzen (Cross-Instance-Fetch). # Single-Printer: leere bridge_url → JS nutzt relative Pfade (gleiche Origin wie UI). bridge_url = "" if len(active) > 1 and host: bridge_url = f"http://{host}:{port}" out.append({ "id": pid, "name": br._state.get("printer_name") or f"Drucker {pid}", "bridge_url": bridge_url, "printer_ip": br._args.printer_ip, "device_id": br._args.device_id or "", }) return self._json_cors({"result": out}) async def handle_kx_print(self, request): """Druckstart aus dem GCode-Store mit optionalen Filament-Assignments.""" try: body = await request.json() except Exception: return self._json_cors({"error": "invalid json"}, status=400) file_id = body.get("file_id") if not file_id: return self._json_cors({"error": "file_id required"}, status=400) gcode_file = self._store.get_file(file_id) if not gcode_file: return self._json_cors({"error": "file not found"}, status=404) # filament_assignments: [{slot_index, material, color_hex}, …] assignments = body.get("filament_assignments") # excluded_objects: ["name1","name2",...] – Pre-Print Skip (v0.9.10) excluded_objects = body.get("excluded_objects") or [] if not isinstance(excluded_objects, list): excluded_objects = [] if assignments: ams_box_mapping = [ { "paint_index": a.get("paint_index", i), "ams_index": a["slot_index"], "paint_color": a.get("paint_color", [255, 255, 255, 255]), "ams_color": a.get("ams_color", [255, 255, 255, 255]), "material_type": a.get("material", "PLA"), } for i, a in enumerate(assignments) ] else: # Kein Dialog → alle belegten Slots wie bei normalem Upload-Druck default_slot = getattr(self._args, "default_ams_slot", "auto") all_loaded = [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5] if default_slot != "auto": try: slot_idx = int(default_slot) loaded = [(i, s) for i, s in all_loaded if i == slot_idx] or all_loaded except ValueError: loaded = all_loaded else: loaded = all_loaded ams_box_mapping = [ { "paint_index": i, "ams_index": i, "paint_color": [255, 255, 255, 255], "ams_color": [255, 255, 255, 255], "material_type": s.get("type", "PLA"), } for i, s in loaded ] use_ams = len(ams_box_mapping) > 0 auto_leveling = getattr(self._args, "auto_leveling", 1) filename = gcode_file["filename"] file_path = gcode_file["path"] # Datei über internes Serve-Endpoint bereitstellen url = f"http://localhost:{self._args.port}/serve/{os.path.basename(file_path)}" payload = { "taskid": "-1", "url": url, "filename": filename, "md5": "", "filepath": None, "filetype": 1, "project_type": 1, "filesize": gcode_file.get("size_bytes", 0), "ams_settings": { "use_ams": use_ams, "ams_box_mapping": ams_box_mapping, }, "task_settings": { "auto_leveling": auto_leveling, "vibration_compensation": 0, "flow_calibration": 0, "dry_mode": 0, "ai_settings": {"status": 0, "count": 0, "type": 1}, "timelapse": {"status": 0, "count": 0, "type": 64}, "drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0}, "model_objects_skip_parts": excluded_objects, }, } log.info(f"KX-Store Druckstart: {filename} ams={len(ams_box_mapping)} slots assignments={bool(assignments)} excluded={len(excluded_objects)}") loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: self.client.publish("print", "start", payload, timeout=15.0) ) if result is None: return self._json_cors({"error": "Keine Antwort vom Drucker"}, status=504) # Job in History starten self._current_job_id = self._store.start_job( gcode_file_id=gcode_file["id"], printer_id=getattr(self._args, "device_id", "unknown"), filament_assignments=assignments, ) return self._json_cors({"result": "ok", "filename": filename}) # ------------------------------------------------------------------------- # HTTP handlers # ------------------------------------------------------------------------- async def handle_server_info(self, request): return web.json_response({ "result": { "klippy_connected": True, "klippy_state": "ready", "components": ["file_manager", "job_state", "virtual_sdcard"], "failed_components":[], "registered_directories": ["gcodes"], "warnings": [], "websocket_count": len(self.ws_clients), "moonraker_version": MOONRAKER_VERSION, "api_version": [1, 3, 0], "api_version_string": "1.3.0", } }) async def handle_printer_info(self, request): s = self._state return web.json_response({ "result": { "state": "ready", "state_message": "Printer is ready", "hostname": "kobrax-bridge", "klipper_path": "/home/pi/klipper", "python_path": "/home/pi/klippy-env/bin/python", "log_file": "/tmp/klippy.log", "config_file": "/home/pi/printer.cfg", "software_version": KLIPPER_VERSION, "cpu_info": s["printer_name"], } }) async def handle_machine_system_info(self, request): return web.json_response({ "result": { "system_info": { "cpu_info": {"cpu_count": 4, "bits": "64bit", "processor": "armv7l", "cpu_desc": "Anycubic Kobra X Bridge", "serial_number": "", "hardware_desc": "", "model": "Kobra X Bridge", "total_memory": 524288, "memory_units": "kB"}, "sd_info": {}, "distribution": {"name": "Linux", "id": "linux", "version": "1.0", "version_parts": {}, "like": "", "codename": ""}, "available_services": [], "service_state": {}, "python": {"version": list(sys.version_info[:3]), "version_string": sys.version}, "network": {}, "canbus": {}, } } }) async def handle_objects_query(self, request): objects = self._build_printer_objects() # filter by requested objects if specified requested = dict(request.rel_url.query) if requested: filtered = {k: objects[k] for k in requested if k in objects} else: filtered = objects return web.json_response({"result": {"status": filtered, "eventtime": time.time()}}) async def handle_objects_list(self, request): return web.json_response({ "result": { "objects": list(self._build_printer_objects().keys()) } }) async def handle_objects_subscribe(self, request): return web.json_response({ "result": { "status": self._build_printer_objects(), "eventtime": time.time(), } }) async def handle_files_list(self, request): filename = self._state.get("filename", "") files = [] if filename: files.append({ "path": filename, "modified": time.time(), "size": 0, "permissions": "rw", }) return web.json_response({"result": files}) async def handle_file_upload(self, request): log.info(f"Upload-Request: {request.method} {request.path_qs} CT={request.headers.get('Content-Type','')[:60]}") ct = request.headers.get("Content-Type", "") if "multipart" not in ct: return web.json_response({"error": "expected multipart"}, status=400) auto_print = False reader = await request.multipart() file_data = None remote_filename = self._last_uploaded_file or "upload.gcode" async for part in reader: if part.name in ("file", "gcode", "upload_file"): remote_filename = part.filename or remote_filename file_data = await part.read() log.info(f"Multipart-Feld '{part.name}': {remote_filename} ({len(file_data)} bytes)") elif part.name == "path": val = (await part.read()).decode("utf-8", errors="replace").strip() if val: remote_filename = val elif part.name == "print": val = (await part.read()).decode("utf-8", errors="replace").strip().lower() auto_print = val == "true" else: log.debug(f"Unbekanntes Multipart-Feld: {part.name}") if not file_data: return web.json_response({"error": "no file received"}, status=400) file_md5 = hashlib.md5(file_data).hexdigest() file_size = len(file_data) # Slicer-Zeitschätzung + Thumbnail aus GCode auslesen est_time = _parse_gcode_estimated_time(file_data) self._state["slicer_time"] = est_time thumbnail_b64 = _extract_thumbnail(file_data) gcode_filaments = _extract_filament_info(file_data) # Datei persistent im GCode-Store ablegen self._store.save_file( file_id=file_md5, filename=remote_filename, data=file_data, est_time_sec=est_time, thumbnail_b64=thumbnail_b64, gcode_filaments=gcode_filaments or None, ) serve_path = os.path.join(self._serve_dir_path, os.path.basename(remote_filename)) del file_data # RAM freigeben self._last_uploaded_file = remote_filename log.info(f"Upload: {remote_filename} ({file_size} bytes) md5={file_md5} → Store + Drucker") # Datei per HTTP auf den Drucker hochladen (serve_path liegt bereits auf Disk) upload_url = self._state.get("upload_url") or None loop = asyncio.get_event_loop() try: result = await loop.run_in_executor( None, self.client.upload_gcode, serve_path, remote_filename, upload_url ) except Exception as e: log.error(f"Upload fehlgeschlagen: {e}") return web.json_response({"error": str(e)}, status=500) log.info(f"Upload erfolgreich: {result}") # Druck starten mit vollständigem Payload (inkl. serve-URL + md5 + size) serve_url = f"http://{request.host}/serve/{remote_filename}" # print=true im Multipart-Formular (Moonraker) oder Query-String → Druck starten # print=false oder fehlt → nur hochladen if not auto_print: auto_print = request.rel_url.query.get("print", "false").lower() == "true" # Thumbnail immer anfordern (Drucker antwortet async mit file/report) self._thumbnail_b64 = "" self.client.publish("file", "fileDetails", {"root": "local", "filename": remote_filename}, timeout=0) self._state["last_upload_url"] = serve_url self._state["last_upload_md5"] = file_md5 self._state["last_upload_size"] = file_size if auto_print: log.info(f"Upload+Print (print=true): {remote_filename}") self._state["file_ready"] = "" loop = asyncio.get_event_loop() loop.run_in_executor(None, lambda: self._start_print(remote_filename, serve_url, file_md5, file_size)) else: log.info(f"Nur hochgeladen (print=false): {remote_filename}") self._state["file_ready"] = remote_filename # OctoPrint-kompatibler Response (OrcaSlicer wertet refs aus) return web.json_response({ "done": True, "files": { "local": { "name": remote_filename, "origin": "local", "path": remote_filename, "refs": { "download": f"http://{request.host}/api/files/local/{remote_filename}", "resource": f"http://{request.host}/api/files/local/{remote_filename}", } } }, "result": { "item": {"path": remote_filename, "root": "gcodes"}, "action": "create_file", } }, status=201) def _start_print(self, filename: str, url: str = "", md5: str = "", filesize: int = 0): self._state["file_ready"] = "" default_slot = getattr(self._args, "default_ams_slot", "auto") all_loaded = [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5] if default_slot != "auto": try: slot_idx = int(default_slot) loaded = [(i, s) for i, s in all_loaded if i == slot_idx] if not loaded: log.warning(f"Standard-Slot {slot_idx} ist leer – fallback auf Auto") loaded = all_loaded except ValueError: loaded = all_loaded else: loaded = all_loaded use_ams = len(loaded) > 0 ams_box_mapping = [ { "paint_index": i, "ams_index": i, "paint_color": [255, 255, 255, 255], "ams_color": [255, 255, 255, 255], "material_type": s.get("type", "PLA"), } for i, s in loaded ] log.info(f"AMS-Slots: {len(loaded)}/{len(self._ams_slots)} belegt → {[i for i,_ in loaded]}") auto_leveling = getattr(self._args, "auto_leveling", 1) payload = { "taskid": "-1", "url": url, "filename": filename, "md5": md5, "filepath": None, "filetype": 1, "project_type": 1, "filesize": filesize, "ams_settings": { "use_ams": use_ams, "ams_box_mapping": ams_box_mapping, }, "task_settings": { "auto_leveling": auto_leveling, "vibration_compensation": 0, "flow_calibration": 0, "dry_mode": 0, "ai_settings": {"status": 0, "count": 0, "type": 1}, "timelapse": {"status": 0, "count": 0, "type": 64}, "drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0}, "model_objects_skip_parts": [], }, } log.info(f"print/start → {filename} url={url} ams={len(self._ams_slots)} slots") result = self.client.publish("print", "start", payload, timeout=15.0) if result: log.info(f"Druckstart bestätigt: state={result.get('state')}") else: log.warning("Druckstart: keine Antwort vom Drucker") async def handle_print_start(self, request): try: body = await request.json() except Exception: body = {} filename = (request.rel_url.query.get("filename") or body.get("filename") or self._last_uploaded_file) if not filename: return web.json_response({"error": "no filename"}, status=400) log.info(f"Druck starten: {filename}") # Optionale Slot-Auswahl aus dem Filament-Dialog filament_assignments = body.get("filament_assignments") # Pre-Print Skip (v0.9.10) excluded_objects = body.get("excluded_objects") or [] if not isinstance(excluded_objects, list): excluded_objects = [] if filament_assignments is not None: ams_box_mapping = [ { "paint_index": a.get("paint_index", i), "ams_index": a["slot_index"], "paint_color": a.get("paint_color", [255, 255, 255, 255]), "ams_color": a.get("ams_color", [255, 255, 255, 255]), "material_type": a.get("material", "PLA"), } for i, a in enumerate(filament_assignments) ] else: # AMS-Mapping aus gecachtem State — leere Slots (status != 5) überspringen default_slot = getattr(self._args, "default_ams_slot", "auto") all_loaded = [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5] if default_slot != "auto": try: slot_idx = int(default_slot) loaded = [(i, s) for i, s in all_loaded if i == slot_idx] or all_loaded except ValueError: loaded = all_loaded else: loaded = all_loaded ams_box_mapping = [ { "paint_index": i, "ams_index": i, "paint_color": [255, 255, 255, 255], "ams_color": [255, 255, 255, 255], "material_type": s.get("type", "PLA"), } for i, s in loaded ] use_ams = len(ams_box_mapping) > 0 auto_leveling = getattr(self._args, "auto_leveling", 1) url = self._state.get("last_upload_url", "") filesize = self._state.get("last_upload_size", 0) md5 = self._state.get("last_upload_md5", "") payload = { "taskid": "-1", "url": url, "filename": filename, "md5": md5, "filepath": None, "filetype": 1, "project_type": 1, "filesize": filesize, "ams_settings": { "use_ams": use_ams, "ams_box_mapping": ams_box_mapping, }, "task_settings": { "auto_leveling": auto_leveling, "vibration_compensation": 0, "flow_calibration": 0, "dry_mode": 0, "ai_settings": {"status": 0, "count": 0, "type": 0}, "timelapse": {"status": 0, "count": 0, "type": 0}, "drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0}, "model_objects_skip_parts": excluded_objects, }, } loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: self.client.publish("print", "start", payload, timeout=15.0) ) if result is None: return web.json_response({"error": "Keine Antwort vom Drucker"}, status=504) return web.json_response({"result": "ok"}) async def handle_print_pause(self, request): loop = asyncio.get_event_loop() taskid = self._state.get("taskid", "-1") await loop.run_in_executor(None, lambda: self.client.pause_print(taskid)) return web.json_response({"result": "ok"}) async def handle_print_resume(self, request): loop = asyncio.get_event_loop() taskid = self._state.get("taskid", "-1") await loop.run_in_executor(None, lambda: self.client.resume_print(taskid)) return web.json_response({"result": "ok"}) async def handle_print_cancel(self, request): loop = asyncio.get_event_loop() taskid = self._state.get("taskid", "-1") await loop.run_in_executor(None, lambda: self.client.stop_print(taskid)) return web.json_response({"result": "ok"}) async def handle_api_file_ready_clear(self, request): self._state["file_ready"] = "" self._thumbnail_b64 = "" self._push_status_update() return web.json_response({"result": "ok"}) async def handle_octoprint_version(self, request): return web.json_response({ "api": "0.1", "server": "1.9.0", "text": "OctoPrint (Kobra X Bridge)", }) async def handle_index(self, request): html = r"""