""" kobrax_moonraker_bridge.py – Moonraker-kompatibler HTTP/WebSocket-Bridge für Anycubic Kobra X Emuliert die Moonraker/Klipper-API damit OrcaSlicer den Kobra X direkt ansteuern kann. Verwendung: python kobrax_moonraker_bridge.py --printer-ip 192.168.178.94 OrcaSlicer-Konfiguration: Drucker-Typ: Klipper | Host: 127.0.0.1 | Port: 7125 """ import argparse try: import config_loader as env_loader except ImportError: import env_loader import asyncio import hashlib import json import logging import os import pathlib import re import sys import tempfile import time import threading import io import zipfile # Bei PyInstaller-Binary liegt alles neben sys.executable, sonst neben __file__ _BASE = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, _BASE) from kobrax_client import KobraXClient try: import imageio_ffmpeg def _find_ffmpeg() -> str: return imageio_ffmpeg.get_ffmpeg_exe() except ImportError: def _find_ffmpeg() -> str: exe_name = "ffmpeg.exe" if sys.platform == "win32" else "ffmpeg" local = os.path.join(_BASE, exe_name) if os.path.isfile(local): return local return "ffmpeg" try: from aiohttp import web import aiohttp except ImportError: print("Fehler: aiohttp nicht installiert. Bitte: pip install aiohttp") sys.exit(1) logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)-5s %(name)s: %(message)s", datefmt="%H:%M:%S") log = logging.getLogger("bridge") # Ring-Buffer für Browser-Log-Stream (letzte 200 Einträge) import collections as _collections _log_buffer: "_collections.deque[dict]" = _collections.deque(maxlen=500) _log_sse_queues: "list[asyncio.Queue]" = [] class _BrowserLogHandler(logging.Handler): """Sendet Log-Records in den Ring-Buffer und alle offenen SSE-Queues.""" _fmt = logging.Formatter(datefmt="%H:%M:%S") def emit(self, record: logging.LogRecord): entry = { "ts": self._fmt.formatTime(record, "%H:%M:%S"), "lvl": record.levelname, "name": record.name, "msg": record.getMessage(), } _log_buffer.append(entry) for q in list(_log_sse_queues): try: q.put_nowait(entry) except Exception: pass _browser_handler = _BrowserLogHandler() logging.getLogger().addHandler(_browser_handler) KOBRA_TO_KLIPPER_STATE = { "free": "standby", "busy": "printing", "printing": "printing", "preheating": "printing", "auto_leveling": "printing", "checking": "printing", "updated": "printing", "init": "printing", "pausing": "paused", "paused": "paused", "resuming": "printing", "resumed": "printing", "stopping": "printing", "stoped": "standby", "finished": "complete", "failed": "error", "canceled": "standby", } MOONRAKER_VERSION = "v0.9.3-1" KLIPPER_VERSION = "v0.12.0-1" def _parse_gcode_estimated_time(data: bytes) -> int: """Liest geschätzte Druckzeit aus GCode (OrcaSlicer + PrusaSlicer). Gibt Sekunden zurück, 0 wenn nicht gefunden. PrusaSlicer schreibt die Zeit ins Header (erste 16KB), OrcaSlicer schreibt sie ans Ende der Datei (letzte 16KB).""" import re # Anfang + Ende der Datei durchsuchen (OrcaSlicer schreibt Zeit am Ende) search_text = (data[:16384] + data[-65536:]).decode("utf-8", errors="ignore") # OrcaSlicer: ; total estimated time: 9m 20s # PrusaSlicer: ; estimated printing time (normal mode) = 1h 9m 20s m = (re.search(r";\s*total estimated time:\s*(.*)", search_text) or re.search(r";\s*estimated printing time \(normal mode\)\s*=\s*(.*)", search_text)) if not m: return 0 parts = re.findall(r"(\d+)\s*([hms])", m.group(1)) secs = 0 for val, unit in parts: if unit == "h": secs += int(val) * 3600 elif unit == "m": secs += int(val) * 60 elif unit == "s": secs += int(val) if secs: log.info(f"Slicer estimate: {secs}s ({m.group(1).strip()})") return secs _FILAMENT_COLOR_KEYS = ( "filament_colour", "filament_color", "filament_colours", "filament_colors", "extruder_colour", "extruder_color", ) _FILAMENT_MATERIAL_KEYS = ( "filament_type", "filament_types", "filament_settings_id", "filament_preset", ) _KNOWN_MATERIALS = ( "PLA-CF", "PETG-CF", "PA-CF", "PLA SILK", "PETG", "PLA", "ABS", "ASA", "TPU", "PA", "PC", "HIPS", "PVA", ) def _normalize_material(value) -> str: text = str(value or "").upper().replace("_", " ").replace("-", "-").strip() for material in _KNOWN_MATERIALS: if re.search(rf"(^|[^A-Z0-9]){re.escape(material)}([^A-Z0-9]|$)", text): return material return text.split()[0] if text else "PLA" def _color_to_rgb(value, default=None): if default is None: default = [255, 255, 255] if isinstance(value, list) and len(value) >= 3: try: return [max(0, min(255, int(value[0]))), max(0, min(255, int(value[1]))), max(0, min(255, int(value[2])))] except Exception: return default if isinstance(value, str): m = re.search(r"#?([0-9a-fA-F]{6})(?:[0-9a-fA-F]{2})?", value) if m: raw = m.group(1) return [int(raw[0:2], 16), int(raw[2:4], 16), int(raw[4:6], 16)] return default def _rgba(color) -> list[int]: rgb = _color_to_rgb(color) return [rgb[0], rgb[1], rgb[2], 255] def _rgb_distance(a, b) -> int: ar, ag, ab = _color_to_rgb(a) br, bg, bb = _color_to_rgb(b) return (ar - br) ** 2 + (ag - bg) ** 2 + (ab - bb) ** 2 def _parse_colors_from_text(text: str) -> list[list[int]]: colors = [] for match in re.finditer(r"#?([0-9a-fA-F]{6})(?:[0-9a-fA-F]{2})?", text or ""): raw = match.group(1) colors.append([int(raw[0:2], 16), int(raw[2:4], 16), int(raw[4:6], 16)]) return colors def _split_config_values(value: str) -> list[str]: value = (value or "").strip().strip("[]") parts = re.split(r"[;,]", value) return [p.strip().strip("\"' ") for p in parts if p.strip().strip("\"' ")] def _extract_config_value(line: str) -> str: attr = re.search(r"value=[\"']([^\"']+)[\"']", line) if attr: return attr.group(1) if "=" in line: return line.split("=", 1)[1].strip() if ":" in line: return line.split(":", 1)[1].strip() return line def _parse_filament_metadata_text(text: str) -> dict: colors: list[list[int]] = [] materials: list[str] = [] for line in (text or "").splitlines(): low = line.lower() if any(k in low for k in _FILAMENT_COLOR_KEYS): value = _extract_config_value(line) for color in _parse_colors_from_text(value): if color not in colors: colors.append(color) if any(k in low for k in _FILAMENT_MATERIAL_KEYS): value = _extract_config_value(line) for part in _split_config_values(value): mat = _normalize_material(part) if mat and mat not in materials: materials.append(mat) return {"colors": colors, "materials": materials} def _merge_filament_metadata(target: dict, source: dict): for key in ("colors", "materials"): for value in source.get(key, []): if value not in target[key]: target[key].append(value) def _parse_uploaded_filament_metadata(data: bytes, filename: str) -> dict: """Best-effort extraction of slicer filament colors/materials from G-code or 3MF.""" meta = {"colors": [], "materials": [], "source": ""} suffix = pathlib.Path(filename or "").suffix.lower() if suffix == ".3mf" or data[:4] == b"PK\x03\x04": try: with zipfile.ZipFile(io.BytesIO(data)) as zf: for name in zf.namelist(): lname = name.lower() if not lname.endswith((".config", ".json", ".model", ".xml", ".gcode")): continue if zf.getinfo(name).file_size > 2_000_000: continue text = zf.read(name).decode("utf-8", errors="ignore") before = len(meta["colors"]) + len(meta["materials"]) _merge_filament_metadata(meta, _parse_filament_metadata_text(text)) if len(meta["colors"]) + len(meta["materials"]) > before and not meta["source"]: meta["source"] = name except Exception as e: log.warning(f"3MF metadata could not be parsed: {e}") else: search = (data[:131072] + data[-262144:]).decode("utf-8", errors="ignore") _merge_filament_metadata(meta, _parse_filament_metadata_text(search)) if meta["colors"] or meta["materials"]: meta["source"] = "gcode" return meta class KobraXBridge: def __init__(self, client: KobraXClient, args=None): self.client = client self._args = args self.ws_clients: set[web.WebSocketResponse] = set() self._last_state: dict = {} self._state = { "nozzle_temp": 0.0, "nozzle_target": 0.0, "bed_temp": 0.0, "bed_target": 0.0, "print_state": "standby", "kobra_state": "free", "filename": "", "slicer_time": 0, "progress": 0.0, "print_duration": 0, "remain_time": 0, "curr_layer": 0, "total_layers": 0, "printer_name": "Anycubic Kobra X", "firmware_version": "unknown", "upload_url": "", "camera_url": "", "fan_speed": 0, "light_on": False, "light_brightness": 80, "taskid": "-1", "print_speed_mode": 2, "connection_error": "", "file_ready": "", } self._ams_slots: list[dict] = [] self._ams_loaded_slot: int = -1 self._last_uploaded_file: str = "" self._uploaded_filament_metadata: dict[str, dict] = {} self._serve_dir = tempfile.TemporaryDirectory(prefix="kobrax_serve_") self._serve_dir_path: str = self._serve_dir.name self._thumbnail_b64: str = "" # Register MQTT push callbacks client.callbacks["tempature/report"] = self._on_temp client.callbacks["print/report"] = self._on_print client.callbacks["info/report"] = self._on_info client.callbacks["file/report"] = self._on_file client.callbacks["multiColorBox/report"] = self._on_multicolor_box # ------------------------------------------------------------------------- # MQTT callbacks (called from reader thread) # ------------------------------------------------------------------------- def _on_temp(self, payload: dict): d = payload.get("data") or {} self._state["nozzle_temp"] = float(d.get("curr_nozzle_temp", 0)) self._state["nozzle_target"] = float(d.get("target_nozzle_temp", 0)) self._state["bed_temp"] = float(d.get("curr_hotbed_temp", 0)) self._state["bed_target"] = float(d.get("target_hotbed_temp", 0)) self._push_status_update() def _on_print(self, payload: dict): d = payload.get("data") or {} kobra_state = payload.get("state", "") self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "printing") if kobra_state: self._state["kobra_state"] = kobra_state if kobra_state in ("stoped", "canceled"): self._state["progress"] = 0.0 self._state["filename"] = "" self._state["file_ready"] = "" self._state["print_duration"] = 0 self._state["remain_time"] = 0 self._state["slicer_time"] = 0 self._thumbnail_b64 = "" self._state["filename"] = d.get("filename", self._state["filename"]) if "progress" in d: self._state["progress"] = float(d["progress"]) / 100.0 if "print_time" in d: self._state["print_duration"] = int(d["print_time"]) * 60 if "remain_time" in d: self._state["remain_time"] = int(d["remain_time"]) * 60 if "curr_layer" in d: self._state["curr_layer"] = d["curr_layer"] if "total_layers" in d: self._state["total_layers"] = d["total_layers"] if "taskid" in d: self._state["taskid"] = str(d["taskid"]) settings = d.get("settings") or {} if "print_speed_mode" in settings: self._state["print_speed_mode"] = int(settings["print_speed_mode"]) self._push_status_update() def _on_info(self, payload: dict): d = payload.get("data") or {} self._state["printer_name"] = d.get("printerName", self._state["printer_name"]) self._state["firmware_version"] = d.get("version", self._state["firmware_version"]) kobra_state = d.get("state", "") if kobra_state: self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "standby") self._state["kobra_state"] = kobra_state t = d.get("temp") or {} if t: self._state["nozzle_temp"] = float(t.get("curr_nozzle_temp", 0)) self._state["nozzle_target"] = float(t.get("target_nozzle_temp", 0)) self._state["bed_temp"] = float(t.get("curr_hotbed_temp", 0)) self._state["bed_target"] = float(t.get("target_hotbed_temp", 0)) urls = d.get("urls") or {} if urls.get("fileUploadurl"): self._state["upload_url"] = urls["fileUploadurl"] if urls.get("rtspUrl"): self._state["camera_url"] = urls["rtspUrl"] fan = d.get("fan_speed_pct") if fan is not None: self._state["fan_speed"] = int(fan) speed_mode = d.get("print_speed_mode") if speed_mode is not None: self._state["print_speed_mode"] = int(speed_mode) self._push_status_update() def _on_file(self, payload: dict): d = payload.get("data") or {} details = d.get("file_details") or {} thumb = details.get("thumbnail") or details.get("png_image") or "" if thumb: self._thumbnail_b64 = thumb log.info(f"Thumbnail received: {len(thumb)} base64 chars") self._push_status_update() def _on_multicolor_box(self, payload: dict): boxes = (payload.get("data") or {}).get("multi_color_box") or [] if not boxes: return box = boxes[0] slots = box.get("slots") or [] loaded = box.get("loaded_slot", -1) if loaded != -1: self._ams_loaded_slot = loaded # Tip-Forming: nach Einziehen (status=10) oder Ausziehen (status=11) # schickt der originale Slicer automatisch type=3 (Extruder-Rückzug) fs = box.get("feed_status") or {} current_status = fs.get("current_status") slot_index = fs.get("slot_index", 0) if current_status in (10, 11): import threading def _tip_form(): import time; time.sleep(2) self.client.publish( "multiColorBox", "feedFilament", {"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": 3}}]}, timeout=0 ) log.info(f"Tip-Forming (type=3) nach status={current_status} slot={slot_index}") threading.Thread(target=_tip_form, daemon=True).start() if slots: self._ams_slots = slots log.info(f"AMS slots received: {len(slots)}, loaded_slot={self._ams_loaded_slot}") self._push_status_update() # OrcaSlicer filament preset IDs (MoonrakerPrinterAgent.cpp mapping) _TRAY_INFO_IDX = { "PLA": "OGFL99", "PLA-CF": "OGFL98", "PLA SILK": "OGFL96", "PETG": "OGFG99", "PETG-CF": "OGFG98", "ABS": "OGFB99", "ASA": "OGFB98", "TPU": "OGFT99", "PA": "OGFP99", "PA-CF": "OGFP98", "PC": "OGFC99", "HIPS": "OGFH99", "PVA": "OGFV99", } def _build_lane_data(self) -> dict: """Baut BBL-AMS-JSON für OrcaSlicer DevFilaSystemParser::ParseV1_0.""" slots = self._ams_slots total = len(slots) if total == 0: return {"ams": [], "ams_exist_bits": "0", "tray_exist_bits": "0"} ams_count = (total + 3) // 4 ams_exist_bits = 0 tray_exist_bits = 0 ams_array = [] for ams_id in range(ams_count): ams_exist_bits |= (1 << ams_id) tray_array = [] max_slot = min(3, total - ams_id * 4 - 1) for slot_id in range(max_slot + 1): slot_index = ams_id * 4 + slot_id slot = slots[slot_index] if slot_index < total else {} occupied = slot.get("status") == 5 if occupied: tray_exist_bits |= (1 << slot_index) color_raw = slot.get("color", [255, 255, 255]) if isinstance(color_raw, list) and len(color_raw) >= 3: color_hex = "{:02X}{:02X}{:02X}FF".format( int(color_raw[0]), int(color_raw[1]), int(color_raw[2]) ) elif isinstance(color_raw, str) and len(color_raw) >= 6: color_hex = color_raw[:6].upper() + "FF" else: color_hex = "FFFFFFFF" material = slot.get("type", "PLA").upper() tray_info_idx = self._TRAY_INFO_IDX.get(material, "OGFL99") tray_array.append({ "id": str(slot_id), "tag_uid": "0000000000000000", "tray_info_idx": tray_info_idx, "tray_type": material, "tray_color": color_hex, }) else: tray_array.append({ "id": str(slot_id), "tag_uid": "0000000000000000", "tray_info_idx": "", "tray_type": "", "tray_color": "00000000", "tray_slot_placeholder": "1", }) ams_array.append({"id": str(ams_id), "info": "0002", "tray": tray_array}) return { "ams": ams_array, "ams_exist_bits": format(ams_exist_bits, "X"), "tray_exist_bits": format(tray_exist_bits, "X"), } def _uploaded_metadata_for(self, filename: str) -> dict: if not filename: return {"colors": [], "materials": [], "source": ""} return (self._uploaded_filament_metadata.get(filename) or self._uploaded_filament_metadata.get(os.path.basename(filename)) or {"colors": [], "materials": [], "source": ""}) def _loaded_ams_slots(self) -> list[tuple[int, dict]]: return [(i, s) for i, s in enumerate(self._ams_slots) if s.get("status") == 5] def _filtered_loaded_ams_slots(self) -> list[tuple[int, dict]]: loaded = self._loaded_ams_slots() default_slot = getattr(self._args, "default_ams_slot", "auto") if default_slot == "auto": return loaded try: slot_idx = int(default_slot) except ValueError: return loaded selected = [(i, s) for i, s in loaded if i == slot_idx] if selected: return selected log.warning(f"Default slot {slot_idx} is empty - falling back to Auto") return loaded def _build_ams_assignments(self, filename: str) -> list[dict]: loaded = self._filtered_loaded_ams_slots() if not loaded: return [] metadata = self._uploaded_metadata_for(filename) colors = metadata.get("colors") or [] materials = metadata.get("materials") or [] target_count = max(len(colors), len(materials)) # Without slicer metadata, preserve the previous behavior: advertise all # occupied slots and let the printer/firmware use its normal fallback. if target_count == 0: return [{ "paint_index": i, "slot_index": i, "paint_color": _rgba(slot.get("color", [255, 255, 255])), "ams_color": _rgba(slot.get("color", [255, 255, 255])), "material_type": _normalize_material(slot.get("type", "PLA")), "reason": "loaded", } for i, slot in loaded] assignments = [] used_slots: set[int] = set() for paint_index in range(target_count): target_color = colors[paint_index] if paint_index < len(colors) else None target_material = (_normalize_material(materials[paint_index]) if paint_index < len(materials) else "") candidates = loaded if target_material: material_matches = [ (i, s) for i, s in candidates if _normalize_material(s.get("type", "PLA")) == target_material ] if material_matches: candidates = material_matches unused = [(i, s) for i, s in candidates if i not in used_slots] if unused: candidates = unused def _score(item): slot_index, slot = item score = 0 if target_color is not None: score += _rgb_distance(target_color, slot.get("color", [255, 255, 255])) if target_material and _normalize_material(slot.get("type", "PLA")) != target_material: score += 200000 return score, slot_index slot_index, slot = min(candidates, key=_score) used_slots.add(slot_index) assignments.append({ "paint_index": paint_index, "slot_index": slot_index, "paint_color": _rgba(target_color or slot.get("color", [255, 255, 255])), "ams_color": _rgba(slot.get("color", [255, 255, 255])), "material_type": _normalize_material(slot.get("type", target_material or "PLA")), "target_material": target_material, "reason": "metadata", }) summary = [ f"T{a['paint_index']}→S{a['slot_index']} {a['material_type']}" for a in assignments ] log.info(f"AMS metadata mapping for {filename}: {', '.join(summary)}") return assignments def _build_anycubic_ams_mapping(self, filename: str) -> list[dict]: return [{ "paint_index": a["paint_index"], "ams_index": a["slot_index"], "paint_color": a["paint_color"], "ams_color": a["ams_color"], "material_type": a["material_type"], } for a in self._build_ams_assignments(filename)] def _build_simple_ams_mapping(self, filename: str) -> list[dict]: return [{ "slot_index": a["slot_index"], "material_type": a["material_type"], "color": a["ams_color"][:3], "paint_index": a["paint_index"], "paint_color": a["paint_color"][:3], } for a in self._build_ams_assignments(filename)] # ------------------------------------------------------------------------- # WebSocket push # ------------------------------------------------------------------------- def _push_status_update(self): if not self.ws_clients: return msg = { "jsonrpc": "2.0", "method": "notify_status_update", "params": [self._build_printer_objects(), time.time()], } text = json.dumps(msg) dead = set() for ws in self.ws_clients: try: asyncio.run_coroutine_threadsafe(ws.send_str(text), ws._loop) except Exception: dead.add(ws) self.ws_clients -= dead def _build_printer_objects(self) -> dict: s = self._state return { "extruder": { "temperature": s["nozzle_temp"], "target": s["nozzle_target"], "power": 0.0, }, "heater_bed": { "temperature": s["bed_temp"], "target": s["bed_target"], "power": 0.0, }, "print_stats": { "state": s["print_state"], "filename": s["filename"], "print_duration": s["print_duration"], "total_duration": s["print_duration"], "remain_time": s["remain_time"], "info": { "current_layer": s["curr_layer"], "total_layer": s["total_layers"], }, }, "display_status": { "progress": s["progress"], "message": "", }, "virtual_sdcard": { "progress": s["progress"], "is_active": s["print_state"] == "printing", "file_path": s["filename"], }, "toolhead": { "position": [0, 0, 0, 0], "homed_axes": "xyz", "print_time": s["print_duration"], "estimated_print_time": s["print_duration"], }, } # ------------------------------------------------------------------------- # HTTP handlers # ------------------------------------------------------------------------- async def handle_server_info(self, request): return web.json_response({ "result": { "klippy_connected": True, "klippy_state": "ready", "components": ["file_manager", "job_state", "virtual_sdcard"], "failed_components":[], "registered_directories": ["gcodes"], "warnings": [], "websocket_count": len(self.ws_clients), "moonraker_version": MOONRAKER_VERSION, "api_version": [1, 3, 0], "api_version_string": "1.3.0", } }) async def handle_printer_info(self, request): s = self._state return web.json_response({ "result": { "state": "ready", "state_message": "Printer is ready", "hostname": "kobrax-bridge", "klipper_path": "/home/pi/klipper", "python_path": "/home/pi/klippy-env/bin/python", "log_file": "/tmp/klippy.log", "config_file": "/home/pi/printer.cfg", "software_version": KLIPPER_VERSION, "cpu_info": s["printer_name"], } }) async def handle_machine_system_info(self, request): return web.json_response({ "result": { "system_info": { "cpu_info": {"cpu_count": 4, "bits": "64bit", "processor": "armv7l", "cpu_desc": "Anycubic Kobra X Bridge", "serial_number": "", "hardware_desc": "", "model": "Kobra X Bridge", "total_memory": 524288, "memory_units": "kB"}, "sd_info": {}, "distribution": {"name": "Linux", "id": "linux", "version": "1.0", "version_parts": {}, "like": "", "codename": ""}, "available_services": [], "service_state": {}, "python": {"version": list(sys.version_info[:3]), "version_string": sys.version}, "network": {}, "canbus": {}, } } }) async def handle_objects_query(self, request): objects = self._build_printer_objects() # filter by requested objects if specified requested = dict(request.rel_url.query) if requested: filtered = {k: objects[k] for k in requested if k in objects} else: filtered = objects return web.json_response({"result": {"status": filtered, "eventtime": time.time()}}) async def handle_objects_list(self, request): return web.json_response({ "result": { "objects": list(self._build_printer_objects().keys()) } }) async def handle_objects_subscribe(self, request): return web.json_response({ "result": { "status": self._build_printer_objects(), "eventtime": time.time(), } }) async def handle_files_list(self, request): filename = self._state.get("filename", "") files = [] if filename: files.append({ "path": filename, "modified": time.time(), "size": 0, "permissions": "rw", }) return web.json_response({"result": files}) async def handle_file_upload(self, request): log.info(f"Upload-Request: {request.method} {request.path_qs} CT={request.headers.get('Content-Type','')[:60]}") ct = request.headers.get("Content-Type", "") if "multipart" not in ct: return web.json_response({"error": "expected multipart"}, status=400) auto_print = False reader = await request.multipart() file_data = None remote_filename = self._last_uploaded_file or "upload.gcode" async for part in reader: if part.name in ("file", "gcode", "upload_file"): remote_filename = part.filename or remote_filename file_data = await part.read() log.info(f"Multipart-Feld '{part.name}': {remote_filename} ({len(file_data)} bytes)") elif part.name == "path": val = (await part.read()).decode("utf-8", errors="replace").strip() if val: remote_filename = val elif part.name == "print": val = (await part.read()).decode("utf-8", errors="replace").strip().lower() auto_print = val == "true" else: log.debug(f"Unbekanntes Multipart-Feld: {part.name}") if not file_data: return web.json_response({"error": "no file received"}, status=400) file_md5 = hashlib.md5(file_data).hexdigest() file_size = len(file_data) # Slicer-Zeitschätzung aus GCode-Header auslesen self._state["slicer_time"] = _parse_gcode_estimated_time(file_data) filament_meta = _parse_uploaded_filament_metadata(file_data, remote_filename) self._uploaded_filament_metadata[remote_filename] = filament_meta self._uploaded_filament_metadata[os.path.basename(remote_filename)] = filament_meta if filament_meta.get("colors") or filament_meta.get("materials"): log.info( "Upload filament metadata: " f"colors={filament_meta.get('colors', [])} " f"materials={filament_meta.get('materials', [])} " f"source={filament_meta.get('source', '')}" ) # Datei auf Disk ablegen (temp-Verzeichnis) damit Drucker sie per HTTP abrufen kann safe_name = os.path.basename(remote_filename) # keine Pfad-Traversal serve_path = os.path.join(self._serve_dir_path, safe_name) with open(serve_path, "wb") as f: f.write(file_data) del file_data # RAM freigeben self._last_uploaded_file = remote_filename log.info(f"Upload: {remote_filename} ({file_size} bytes) md5={file_md5} -> printer") # Datei per HTTP auf den Drucker hochladen (serve_path liegt bereits auf Disk) upload_url = self._state.get("upload_url") or None loop = asyncio.get_event_loop() try: result = await loop.run_in_executor( None, self.client.upload_gcode, serve_path, remote_filename, upload_url ) except Exception as e: log.error(f"Upload failed: {e}") return web.json_response({"error": str(e)}, status=500) log.info(f"Upload succeeded: {result}") # Druck starten mit vollständigem Payload (inkl. serve-URL + md5 + size) serve_url = f"http://{request.host}/serve/{remote_filename}" # print=true im Multipart-Formular (Moonraker) oder Query-String → Druck starten # print=false oder fehlt → nur hochladen if not auto_print: auto_print = request.rel_url.query.get("print", "false").lower() == "true" # Thumbnail immer anfordern (Drucker antwortet async mit file/report) self._thumbnail_b64 = "" self.client.publish("file", "fileDetails", {"root": "local", "filename": remote_filename}, timeout=0) if auto_print: log.info(f"Upload+Print (print=true): {remote_filename}") self._state["file_ready"] = "" loop = asyncio.get_event_loop() loop.run_in_executor(None, lambda: self._start_print(remote_filename, serve_url, file_md5, file_size)) else: log.info(f"Uploaded only (print=false): {remote_filename}") self._state["file_ready"] = remote_filename # OctoPrint-kompatibler Response (OrcaSlicer wertet refs aus) return web.json_response({ "done": True, "files": { "local": { "name": remote_filename, "origin": "local", "path": remote_filename, "refs": { "download": f"http://{request.host}/api/files/local/{remote_filename}", "resource": f"http://{request.host}/api/files/local/{remote_filename}", } } }, "result": { "item": {"path": remote_filename, "root": "gcodes"}, "action": "create_file", } }, status=201) def _start_print(self, filename: str, url: str = "", md5: str = "", filesize: int = 0): self._state["file_ready"] = "" ams_box_mapping = self._build_anycubic_ams_mapping(filename) use_ams = len(ams_box_mapping) > 0 log.info( f"AMS-Slots: {len(self._loaded_ams_slots())}/{len(self._ams_slots)} belegt " f"→ {[m['ams_index'] for m in ams_box_mapping]}" ) auto_leveling = getattr(self._args, "auto_leveling", 1) payload = { "taskid": "-1", "url": url, "filename": filename, "md5": md5, "filepath": None, "filetype": 1, "project_type": 1, "filesize": filesize, "ams_settings": { "use_ams": use_ams, "ams_box_mapping": ams_box_mapping, }, "task_settings": { "auto_leveling": auto_leveling, "vibration_compensation": 0, "flow_calibration": 0, "dry_mode": 0, "ai_settings": {"status": 0, "count": 0, "type": 1}, "timelapse": {"status": 0, "count": 0, "type": 64}, "drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0}, "model_objects_skip_parts": [], }, } log.info(f"print/start → {filename} url={url} ams={len(self._ams_slots)} slots") result = self.client.publish("print", "start", payload, timeout=15.0) if result: log.info(f"Print start confirmed: state={result.get('state')}") else: log.warning("Print start: no response from printer") async def handle_print_start(self, request): try: body = await request.json() except Exception: body = {} filename = (request.rel_url.query.get("filename") or body.get("filename") or self._last_uploaded_file) if not filename: return web.json_response({"error": "no filename"}, status=400) log.info(f"Starting print: {filename}") ams_box_mapping = self._build_simple_ams_mapping(filename) use_ams = len(ams_box_mapping) > 0 payload = { "filename": filename, "taskid": str(int(time.time())), "use_ams": use_ams, } if ams_box_mapping: payload["ams_box_mapping"] = ams_box_mapping loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: self.client.publish("print", "start", payload, timeout=15.0) ) if result is None: return web.json_response({"error": "No response from printer"}, status=504) return web.json_response({"result": "ok"}) async def handle_print_pause(self, request): loop = asyncio.get_event_loop() taskid = self._state.get("taskid", "-1") await loop.run_in_executor(None, lambda: self.client.pause_print(taskid)) return web.json_response({"result": "ok"}) async def handle_print_resume(self, request): loop = asyncio.get_event_loop() taskid = self._state.get("taskid", "-1") await loop.run_in_executor(None, lambda: self.client.resume_print(taskid)) return web.json_response({"result": "ok"}) async def handle_print_cancel(self, request): loop = asyncio.get_event_loop() taskid = self._state.get("taskid", "-1") await loop.run_in_executor(None, lambda: self.client.stop_print(taskid)) return web.json_response({"result": "ok"}) async def handle_api_file_ready_clear(self, request): self._state["file_ready"] = "" self._thumbnail_b64 = "" self._push_status_update() return web.json_response({"result": "ok"}) async def handle_octoprint_version(self, request): return web.json_response({ "api": "0.1", "server": "1.9.0", "text": "OctoPrint (Kobra X Bridge)", }) async def handle_index(self, request): html = r""" KX-Bridge
Anycubic Kobra X
Standby
📷 Kamera
💡 Licht
📷 Kamera nicht gestartet
Fortschritt
0%
Temperaturen
Nozzle
°C
0°C
Bett
°C
0°C
Verlauf (letzte 60 Messungen)
XY-Achsen
Z-Achse
Schrittweite: 1 mm
🏎 Druckgeschwindigkeit
🌀 Lüfter
0
AMS / Filamentbox
Keine AMS-Daten empfangen
Slot auswählen
Slot 1
Ereignis-Log Download
Dir: Topic:
""" version = self._read_version() html = html.replace("'__VERSION__'", f"'{version}'") return web.Response(text=html, content_type="text/html", headers={"Cache-Control": "no-store, no-cache, must-revalidate"}) async def handle_api_light(self, request): try: body = await request.json() except Exception: body = {} on = bool(body.get("on", True)) brightness = int(body.get("brightness", self._state["light_brightness"])) loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: self.client.publish( "light", "control", {"type": 3, "status": 1 if on else 0, "brightness": brightness}, timeout=0 )) self._state["light_on"] = on self._state["light_brightness"] = brightness return web.json_response({"result": "ok"}) async def handle_api_fan(self, request): try: body = await request.json() except Exception: body = {} speed = int(body.get("speed", 0)) loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: self.client.publish( "fan", "setSpeed", {"fan_speed_pct": speed}, timeout=0 )) self._state["fan_speed"] = speed return web.json_response({"result": "ok"}) async def handle_api_connect(self, request): loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, self.client.connect) self._state["print_state"] = "standby" self._state["kobra_state"] = "free" log.info("Manuell verbunden") return web.json_response({"result": "connected"}) except Exception as e: return web.json_response({"error": str(e)}, status=500) async def handle_api_disconnect(self, request): loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, self.client.disconnect) except Exception: pass self._state["print_state"] = "error" self._state["kobra_state"] = "offline" log.info("Disconnected manually") return web.json_response({"result": "disconnected"}) async def handle_api_speed(self, request): try: body = await request.json() except Exception: body = {} mode = int(body.get("mode", 2)) loop = asyncio.get_event_loop() taskid = self._state.get("taskid", "-1") await loop.run_in_executor(None, lambda: self.client.publish_web( "print", "update", {"taskid": taskid, "settings": {"print_speed_mode": mode}}, )) self._state["print_speed_mode"] = mode return web.json_response({"result": "ok"}) async def handle_api_ams_set_slot(self, request): try: body = await request.json() except Exception: body = {} index = int(body.get("index", 0)) mat = str(body.get("type", "PLA")).upper() color = body.get("color", [255, 255, 255]) if not (isinstance(color, list) and len(color) == 3): return web.json_response({"error": "color must be [r,g,b]"}, status=400) loop = asyncio.get_event_loop() def _send(): resp = self.client.publish( "multiColorBox", "setInfo", {"multi_color_box": [{"id": -1, "slots": [{"index": index, "type": mat, "color": color}]}]}, timeout=5 ) log.info(f"setInfo slot={index} type={mat} color={color} → {resp}") return resp resp = await loop.run_in_executor(None, _send) if resp and resp.get("code") == 200: # Update cached slot immediately for s in self._ams_slots: if s.get("index") == index: s["type"] = mat s["color"] = color break return web.json_response({"result": "ok"}) async def handle_api_ams_feed(self, request): try: body = await request.json() except Exception: body = {} slot_index = int(body.get("slot_index", 0)) feed_type = int(body.get("type", 1)) # Ausziehen (type=2): wenn kein Slot explizit gewählt, den zuletzt geladenen nehmen if feed_type == 2 and self._ams_loaded_slot >= 0: slot_index = self._ams_loaded_slot loop = asyncio.get_event_loop() def _send(): resp = self.client.publish( "multiColorBox", "feedFilament", {"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": feed_type}}]}, timeout=5 ) log.info(f"feedFilament type={feed_type} slot={slot_index} loaded_slot={self._ams_loaded_slot} → {resp}") await loop.run_in_executor(None, _send) return web.json_response({"result": "ok"}) async def handle_api_axis(self, request): try: body = await request.json() except Exception: body = {} action = body.get("action", "move") loop = asyncio.get_event_loop() if action == "turnOff": await loop.run_in_executor(None, lambda: self.client.publish( "axis", "turnOff", None, timeout=0 )) else: axis = int(body.get("axis", 4)) move_type = int(body.get("move_type", 2)) distance = float(body.get("distance", 0)) await loop.run_in_executor(None, lambda: self.client.publish( "axis", "move", {"axis": axis, "move_type": move_type, "distance": distance}, timeout=0 )) return web.json_response({"result": "ok"}) async def handle_api_temperature(self, request): try: body = await request.json() except Exception: body = {} nozzle = body.get("nozzle") bed = body.get("bed") loop = asyncio.get_event_loop() printing = self._state.get("print_state") == "printing" if printing: # During print: runtime update via web/printer topic, one setting at a time taskid = self._state.get("taskid", "-1") if nozzle is not None: n = int(float(nozzle)) await loop.run_in_executor(None, lambda: self.client.publish_web( "print", "update", {"taskid": taskid, "settings": {"target_nozzle_temp": n}}, )) if bed is not None: b = int(float(bed)) await loop.run_in_executor(None, lambda: self.client.publish_web( "print", "update", {"taskid": taskid, "settings": {"target_hotbed_temp": b}}, )) else: # Idle: standard tempature/set with both values n = int(float(nozzle)) if nozzle is not None else int(self._state["nozzle_target"]) b = int(float(bed)) if bed is not None else int(self._state["bed_target"]) await loop.run_in_executor(None, lambda: self.client.publish( "tempature", "set", {"target_nozzle_temp": n, "target_hotbed_temp": b}, timeout=0 )) return web.json_response({"result": "ok"}) async def handle_api_camera(self, request): return web.json_response({"url": self._state["camera_url"]}) async def handle_api_camera_start(self, request): loop = asyncio.get_event_loop() # Wait for pushStarted confirmation before returning result = await loop.run_in_executor(None, lambda: self.client.publish( "video", "startCapture", None, timeout=8.0 )) state = (result or {}).get("state", "") log.info(f"Camera startCapture: state={state}") return web.json_response({"result": "ok", "state": state}) async def handle_api_camera_stop(self, request): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: self.client.publish( "video", "stopCapture", None, timeout=0 )) return web.json_response({"result": "ok"}) async def handle_api_camera_snapshot(self, request): """Einzelner JPEG-Frame aus dem Kamera-Stream – für Obico und andere Snapshot-Clients.""" url = self._state.get("camera_url", "") if not url: return web.Response(status=503, text="No camera URL known") is_rtsp = url.lower().startswith("rtsp://") input_args = ["-fflags", "nobuffer", "-flags", "low_delay"] if is_rtsp: input_args += ["-probesize", "32", "-analyzeduration", "0", "-rtsp_transport", "tcp"] else: input_args += ["-probesize", "1000000", "-analyzeduration", "1000000"] try: proc = await asyncio.create_subprocess_exec( _find_ffmpeg(), "-loglevel", "quiet", *input_args, "-i", url, "-frames:v", "1", "-f", "mjpeg", "-q:v", "3", "pipe:1", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) jpeg, _ = await asyncio.wait_for(proc.communicate(), timeout=20) except asyncio.TimeoutError: return web.Response(status=504, text="Snapshot-Timeout") except Exception as e: return web.Response(status=503, text=str(e)) if not jpeg: return web.Response(status=503, text="No frame received") return web.Response(body=jpeg, content_type="image/jpeg", headers={"Cache-Control": "no-cache"}) async def handle_camera_stream(self, request): """MJPEG proxy: FLV → MJPEG via ffmpeg, served as multipart/x-mixed-replace.""" url = self._state.get("camera_url", "") if not url: return web.Response(status=503, text="No camera URL known") is_rtsp = url.lower().startswith("rtsp://") ffmpeg_input_args = [ "-fflags", "nobuffer", "-flags", "low_delay", ] if is_rtsp: ffmpeg_input_args += ["-probesize", "32", "-analyzeduration", "0", "-rtsp_transport", "tcp"] else: ffmpeg_input_args += ["-probesize", "1000000", "-analyzeduration", "1000000"] # ffmpeg erst starten BEVOR der StreamResponse geöffnet wird # (damit wir bei Fehler noch eine normale HTTP-Response senden können) try: proc = await asyncio.create_subprocess_exec( _find_ffmpeg(), "-loglevel", "quiet", *ffmpeg_input_args, "-i", url, "-vf", "fps=15,scale=640:-1", "-f", "image2pipe", "-vcodec", "mjpeg", "-q:v", "3", "-flush_packets", "1", "pipe:1", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) except (FileNotFoundError, OSError) as e: log.warning("Camera: ffmpeg not found - camera stream unavailable") return web.Response(status=503, text="ffmpeg not found") except Exception as e: log.warning(f"Camera: ffmpeg could not be started: {e}") return web.Response(status=503, text=str(e)) boundary = "kobraxframe" resp = web.StreamResponse(headers={ "Content-Type": f"multipart/x-mixed-replace;boundary={boundary}", "Cache-Control": "no-cache", "Connection": "keep-alive", }) await resp.prepare(request) buf = b"" try: while True: chunk = await proc.stdout.read(65536) if not chunk: break buf += chunk # Extract complete JPEG frames (SOI=FFD8, EOI=FFD9) while True: start = buf.find(b"\xff\xd8") if start == -1: buf = b"" break end = buf.find(b"\xff\xd9", start + 2) if end == -1: buf = buf[start:] break frame = buf[start:end + 2] buf = buf[end + 2:] header = ( f"--{boundary}\r\n" f"Content-Type: image/jpeg\r\n" f"Content-Length: {len(frame)}\r\n\r\n" ).encode() try: await resp.write(header + frame + b"\r\n") except Exception: return resp except Exception as e: log.warning(f"Camera stream interrupted: {e}") finally: try: proc.kill() except Exception: pass return resp async def handle_serve_file(self, request): """Liefert hochgeladene G-Code-Dateien vom Temp-Verzeichnis (für Drucker-Download).""" filename = os.path.basename(request.match_info.get("filename", "")) serve_path = os.path.join(self._serve_dir_path, filename) if not os.path.isfile(serve_path): return web.Response(status=404, text="not found") size = os.path.getsize(serve_path) log.info(f"Printer downloading file: {filename} ({size} bytes)") return web.FileResponse(serve_path, headers={ "Content-Disposition": f'attachment; filename="{filename}"' }) async def handle_api_state(self, request): s = self._state return web.json_response({ "printer_name": s["printer_name"], "firmware_version": s["firmware_version"], "print_state": s["print_state"], "kobra_state": s["kobra_state"], "nozzle_temp": s["nozzle_temp"], "nozzle_target": s["nozzle_target"], "bed_temp": s["bed_temp"], "bed_target": s["bed_target"], "progress": s["progress"], "print_duration": s["print_duration"], "remain_time": s["remain_time"], "curr_layer": s["curr_layer"], "total_layers": s["total_layers"], "filename": s["filename"], "slicer_time": s["slicer_time"], "camera_url": s["camera_url"], "fan_speed": s["fan_speed"], "print_speed_mode": s["print_speed_mode"], "light_on": s["light_on"], "light_brightness": s["light_brightness"], "ams_slots": self._ams_slots, "ams_loaded_slot": self._ams_loaded_slot, "thumbnail": self._thumbnail_b64, "connection_error": s["connection_error"], "file_ready": s["file_ready"], "uploaded_filaments": self._uploaded_metadata_for(s["file_ready"] or self._last_uploaded_file), "version": self._read_version(), }) async def handle_moonraker_database(self, request): """OrcaSlicer Filament-Sync: /server/database/item?namespace=lane_data&key=lanes (AFC-Format)""" namespace = request.rel_url.query.get("namespace", "") key = request.rel_url.query.get("key", "") if namespace == "lane_data": await asyncio.get_event_loop().run_in_executor(None, self._get_ams_slots_fresh) lanes = self._build_lane_data() log.info(f"AMS sync: {len(lanes)} lanes sent to OrcaSlicer") return web.json_response({ "result": { "namespace": "lane_data", "key": key or "lanes", "value": lanes, } }) if namespace in ("AFC", "afc-install", "happy_hare"): return web.json_response({ "result": {"namespace": namespace, "key": key, "value": None} }) return web.json_response( {"error": {"code": 404, "message": f"Namespace '{namespace}' not found"}}, status=404 ) async def handle_database_list(self, request): """OrcaSlicer prüft welche Namespaces vorhanden sind um MMU-Typ zu erkennen.""" return web.json_response({"result": {"namespaces": ["lane_data"]}}) def _get_ams_slots_fresh(self): """Frische Slot-Daten per getInfo holen, Fallback auf gecachte.""" resp = self.client.publish("multiColorBox", "getInfo", None, timeout=5) if resp and resp.get("data"): boxes = resp["data"].get("multi_color_box") or [] if boxes: slots = boxes[0].get("slots") or [] if slots: self._ams_slots = slots return self._ams_slots # ─── Settings ──────────────────────────────────────────────────────────── def _find_config_path(self) -> pathlib.Path: """Gibt den Pfad zur config.ini zurück.""" if hasattr(env_loader, "find_config_path"): return env_loader.find_config_path() # Fallback für alten env_loader script_dir = pathlib.Path(_BASE) for base in (script_dir, script_dir.parent): p = base / "config" / "config.ini" if p.is_file(): return p return script_dir / "config" / "config.ini" async def handle_api_settings_get(self, request): return web.json_response({ "printer_ip": self._args.printer_ip, "mqtt_port": self._args.mqtt_port, "username": self._args.username, "password": self._args.password, "mode_id": self._args.mode_id, "device_id": self._args.device_id, "default_ams_slot": getattr(self._args, "default_ams_slot", "auto"), "auto_leveling": getattr(self._args, "auto_leveling", 1), }) async def handle_api_settings_post(self, request): import configparser data = await request.json() config_path = self._find_config_path() config_path.parent.mkdir(parents=True, exist_ok=True) # Bestehende config.ini lesen (Kommentare gehen verloren, aber Werte bleiben) cfg = configparser.ConfigParser() if config_path.is_file(): cfg.read(config_path, encoding="utf-8") # Sections sicherstellen for section in ("connection", "print", "bridge"): if not cfg.has_section(section): cfg.add_section(section) printer_ip = str(data.get("printer_ip", self._args.printer_ip or "")).split(":")[0] cfg.set("connection", "printer_ip", printer_ip) cfg.set("connection", "mqtt_port", str(data.get("mqtt_port", self._args.mqtt_port or 9883))) cfg.set("connection", "username", str(data.get("username", self._args.username or ""))) cfg.set("connection", "password", str(data.get("password", self._args.password or ""))) cfg.set("connection", "mode_id", str(data.get("mode_id", self._args.mode_id or ""))) cfg.set("connection", "device_id", str(data.get("device_id", self._args.device_id or ""))) cfg.set("print", "default_ams_slot", str(data.get("default_ams_slot", getattr(self._args, "default_ams_slot", "auto")))) cfg.set("print", "auto_leveling", str(data.get("auto_leveling", getattr(self._args, "auto_leveling", 1)))) if not cfg.has_option("bridge", "poll_interval"): cfg.set("bridge", "poll_interval", "3") with open(config_path, "w", encoding="utf-8") as f: f.write("# KX-Bridge Konfigurationsdatei\n\n") cfg.write(f) log.info(f"Settings gespeichert in {config_path}") # Response senden, dann Neustart response = web.json_response({"status": "restarting"}) asyncio.get_event_loop().call_later(0.3, self._restart_bridge) return response def _restart_bridge(self): log.info("Restarting bridge ...") exe = sys.executable # PyInstaller frozen binary: sys.argv[0] == sys.executable → nicht doppelt übergeben if getattr(sys, "frozen", False): os.execv(exe, [exe]) else: os.execv(exe, [exe] + sys.argv) # ─── Update ────────────────────────────────────────────────────────────── STABLE_RELEASE_API = "https://gitea.it-drui.de/api/v1/repos/viewit/KX-Bridge-Release/releases?limit=1" DEV_RELEASE_API = "https://gitea.it-drui.de/api/v1/repos/viewit/KX-Bridge-Release/releases?limit=10&pre-release=true" GITEA_RAW_BASE = "https://gitea.it-drui.de/viewit/KX-Bridge-Release/raw/tag" def _read_version(self) -> str: for base in (pathlib.Path(_BASE), pathlib.Path(_BASE).parent): p = base / "VERSION" if p.is_file(): return p.read_text(encoding="utf-8").strip() return "unknown" def _write_version(self, version: str): for base in (pathlib.Path(_BASE), pathlib.Path(_BASE).parent): p = base / "VERSION" if p.is_file(): p.write_text(version + "\n", encoding="utf-8") return (pathlib.Path(_BASE) / "VERSION").write_text(version + "\n", encoding="utf-8") @staticmethod def _parse_version(v: str) -> "tuple[int, ...]": """'v0.9.1-beta1' → (0, 9, 1) – nur numerische Teile vor dem ersten '-'""" v = v.lstrip("v").split("-")[0] parts = re.split(r"[.\s]+", v) result = [] for p in parts: try: result.append(int(p)) except ValueError: break return tuple(result) or (0,) async def handle_api_log_stream(self, request): """SSE-Endpoint: sendet Log-Einträge live an den Browser.""" resp = web.StreamResponse(headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }) await resp.prepare(request) # Zuerst Ring-Buffer senden for entry in list(_log_buffer): data = json.dumps(entry, ensure_ascii=False) await resp.write(f"data: {data}\n\n".encode()) # Dann live streamen q: asyncio.Queue = asyncio.Queue() _log_sse_queues.append(q) try: while True: entry = await asyncio.wait_for(q.get(), timeout=25) data = json.dumps(entry, ensure_ascii=False) await resp.write(f"data: {data}\n\n".encode()) except asyncio.TimeoutError: await resp.write(b": keepalive\n\n") except (ConnectionResetError, Exception): pass finally: _log_sse_queues.remove(q) if q in _log_sse_queues else None return resp async def handle_api_log_download(self, request): """Gibt alle gepufferten Log-Einträge als Plaintext zum Download.""" lines = [f"[{e['ts']}] {e['lvl']:<5} {e['name']}: {e['msg']}" for e in _log_buffer] text = "\n".join(lines) return web.Response( body=text.encode("utf-8"), content_type="text/plain", headers={"Content-Disposition": 'attachment; filename="kx-bridge.log"'}, ) async def handle_api_update_check(self, request): current = self._read_version() is_dev = "-dev+" in current api_url = self.DEV_RELEASE_API if is_dev else self.STABLE_RELEASE_API try: async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout=aiohttp.ClientTimeout(total=10)) as resp: if resp.status != 200: return web.json_response({"error": f"Gitea HTTP {resp.status}"}, status=502) releases = await resp.json(content_type=None) if not releases: return web.json_response({"error": "No releases found"}, status=404) # Dev: neuestes Release mit "-dev+" im Tag suchen if is_dev: dev_releases = [r for r in releases if "-dev+" in r.get("tag_name", "")] if not dev_releases: return web.json_response({"error": "No dev releases found"}, status=404) data = dev_releases[0] else: data = releases[0] tag = data.get("tag_name", "") latest = tag.lstrip("v") if is_dev: update_available = tag != f"v{current}" else: update_available = self._parse_version(tag) > self._parse_version(current) download_url = f"{self.GITEA_RAW_BASE}/{tag}/kobrax_moonraker_bridge.py" return web.json_response({ "current": current, "latest": latest, "update_available": update_available, "tag": tag, "download_url": download_url, "changelog": data.get("body", ""), }) except Exception as e: return web.json_response({"error": str(e)}, status=502) async def handle_api_update_apply(self, request): data = await request.json() download_url = data.get("download_url", "") new_tag = data.get("tag", "") if not download_url: return web.json_response({"error": "download_url fehlt"}, status=400) script_path = pathlib.Path(sys.executable if getattr(sys, "frozen", False) else __file__).resolve() try: async with aiohttp.ClientSession() as session: async with session.get(download_url, timeout=aiohttp.ClientTimeout(total=30)) as resp: if resp.status != 200: return web.json_response({"error": f"Download HTTP {resp.status}"}, status=502) content = await resp.read() # Atomisch ersetzen tmp = script_path.with_suffix(".py.new") tmp.write_bytes(content) os.replace(tmp, script_path) if new_tag: self._write_version(new_tag.lstrip("v")) log.info(f"Update auf {new_tag} installiert, starte neu …") except Exception as e: return web.json_response({"error": str(e)}, status=502) response = web.json_response({"status": "updating"}) asyncio.get_event_loop().call_later(0.3, self._restart_bridge) return response async def handle_catchall(self, request): body = await request.read() log.warning(f"UNBEKANNT {request.method} {request.path_qs} body={body[:200]}") return web.json_response({"result": {}}, status=200) async def handle_favicon(self, request): # Minimales 1x1 ICO damit der Browser nicht 404 loggt ico = bytes([ 0,0,1,0,1,0,1,1,0,0,1,0,24,0,40,0,0,0,22,0,0,0,40,0,0,0, 1,0,0,0,2,0,0,0,1,0,24,0,0,0,0,0,4,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,255,102,0,0,0,0,0,0 ]) return web.Response(body=ico, content_type="image/x-icon") # ------------------------------------------------------------------------- # WebSocket handler # ------------------------------------------------------------------------- async def handle_websocket(self, request): ws = web.WebSocketResponse(heartbeat=30) await ws.prepare(request) ws._loop = asyncio.get_event_loop() self.ws_clients.add(ws) log.info(f"WS client verbunden ({len(self.ws_clients)} gesamt)") # Send klippy_ready notification await ws.send_str(json.dumps({ "jsonrpc": "2.0", "method": "notify_klippy_ready", "params": [], })) # Send initial status await ws.send_str(json.dumps({ "jsonrpc": "2.0", "method": "notify_status_update", "params": [self._build_printer_objects(), time.time()], })) async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: await self._handle_ws_rpc(ws, msg.data) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE): break self.ws_clients.discard(ws) log.info(f"WS client disconnected ({len(self.ws_clients)} remaining)") return ws async def _handle_ws_rpc(self, ws: web.WebSocketResponse, raw: str): try: req = json.loads(raw) except Exception: return rpc_id = req.get("id") method = req.get("method", "") log.info(f"WS RPC: {method} params={str(req.get('params',''))[:120]}") params = req.get("params") or {} if isinstance(params, list): params = params[0] if params else {} result = None error = None try: if method in ("printer.info", "printer_info"): result = { "state": "ready", "state_message": "Printer is ready", "hostname": "kobrax-bridge", "software_version": KLIPPER_VERSION, "cpu_info": self._state["printer_name"], "klipper_path": "/home/pi/klipper", "python_path": "/home/pi/klippy-env/bin/python", } elif method in ("server.info", "server_info"): result = { "klippy_connected": True, "klippy_state": "ready", "moonraker_version": MOONRAKER_VERSION, "components": [], "failed_components": [], "registered_directories": ["gcodes"], "warnings": [], } elif method in ("printer.objects.list",): result = {"objects": list(self._build_printer_objects().keys())} elif method in ("printer.objects.query", "printer.objects.get"): objects = params.get("objects", {}) all_objs = self._build_printer_objects() if objects: filtered = {k: all_objs.get(k, {}) for k in objects} else: filtered = all_objs result = {"status": filtered, "eventtime": time.time()} elif method == "printer.objects.subscribe": objects = params.get("objects", {}) all_objs = self._build_printer_objects() if objects: filtered = {k: all_objs.get(k, {}) for k in objects} else: filtered = all_objs result = {"status": filtered, "eventtime": time.time()} elif method == "printer.print.start": filename = params.get("filename", self._last_uploaded_file) loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: self._start_print(filename)) result = "ok" elif method == "printer.print.pause": loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.client.pause_print) result = "ok" elif method == "printer.print.resume": loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.client.resume_print) result = "ok" elif method == "printer.print.cancel": loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.client.stop_print) result = "ok" elif method == "machine.system_info": result = {"system_info": {"cpu_info": {"cpu_desc": "Kobra X Bridge"}}} elif method == "server.files.list": result = [] else: log.debug(f"Unbekannte RPC-Methode: {method}") result = {} except Exception as e: log.error(f"RPC error for {method}: {e}") error = {"code": -32603, "message": str(e)} if rpc_id is not None: response = {"jsonrpc": "2.0", "id": rpc_id} if error: response["error"] = error else: response["result"] = result await ws.send_str(json.dumps(response)) # ------------------------------------------------------------------------- # Poll loop (sync, runs in executor) # ------------------------------------------------------------------------- def _printer_reachable(self) -> bool: """TCP-Probe auf den MQTT-Port – kein ICMP nötig, kein root erforderlich.""" import socket as _socket try: with _socket.create_connection( (self._args.printer_ip, self._args.mqtt_port), timeout=2.0 ): return True except OSError: return False def _poll_loop(self, stop_event: threading.Event): _offline = self._state["kobra_state"] == "offline" _probe_interval = 10.0 # Sekunden zwischen TCP-Probes im Offline-Modus while not stop_event.is_set(): # ── Offline-Modus: warten bis Drucker wieder erreichbar ────────── if _offline: if self._printer_reachable(): log.info("Printer reachable - connecting MQTT ...") try: self.client.connect() _offline = False self._state["print_state"] = "standby" self._state["kobra_state"] = "free" self._state["connection_error"] = "" log.info("MQTT-Verbindung wiederhergestellt") except Exception as e: err = _mqtt_error_msg(e) self._state["connection_error"] = err log.warning(f"Connection attempt failed: {err}") stop_event.wait(_probe_interval) continue else: stop_event.wait(_probe_interval) continue # ── Online-Modus: normaler Poll ────────────────────────────────── try: info = self.client.query_info() if info: self._on_info(info) # Während Druck: print/report direkt abfragen if self._state["print_state"] in ("printing", "preheating", "auto_leveling", "checking", "init"): print_r = self.client.publish("print", "query", timeout=3.0) if print_r: self._on_print(print_r) box = self.client.query_multicolor_box() if box: boxes = (box.get("data") or {}).get("multi_color_box") or [] slots = boxes[0].get("slots") or [] if boxes else [] if slots: self._ams_slots = slots except Exception as e: log.warning(f"Poll error: {e}") # Prüfen ob Drucker wirklich weg ist if not self._printer_reachable(): log.info("Printer unreachable - switching to offline mode") self._state["print_state"] = "error" self._state["kobra_state"] = "offline" self._state["connection_error"] = f"Printer unreachable ({self._args.printer_ip})" try: self.client.disconnect() except Exception: pass _offline = True stop_event.wait(3.0) # --------------------------------------------------------------------------- # App factory + main # --------------------------------------------------------------------------- def _mqtt_error_msg(exc: Exception) -> str: msg = str(exc) if "20020005" in msg: return "Wrong MQTT credentials (username, password or device ID incorrect)" return msg def build_app(bridge: KobraXBridge) -> web.Application: app = web.Application() r = app.router # Moonraker API r.add_get("/server/info", bridge.handle_server_info) r.add_get("/printer/info", bridge.handle_printer_info) r.add_get("/machine/system_info", bridge.handle_machine_system_info) r.add_get("/printer/objects/list", bridge.handle_objects_list) r.add_get("/printer/objects/query", bridge.handle_objects_query) r.add_get("/printer/objects/subscribe", bridge.handle_objects_subscribe) r.add_post("/printer/objects/subscribe", bridge.handle_objects_subscribe) r.add_get("/server/files/list", bridge.handle_files_list) r.add_post("/server/files/upload", bridge.handle_file_upload) r.add_post("/printer/print/start", bridge.handle_print_start) r.add_post("/printer/print/pause", bridge.handle_print_pause) r.add_post("/printer/print/resume", bridge.handle_print_resume) r.add_post("/printer/print/cancel", bridge.handle_print_cancel) # OctoPrint compatibility (OrcaSlicer probes this + uploads here) r.add_get("/api/version", bridge.handle_octoprint_version) r.add_post("/api/files/local", bridge.handle_file_upload) r.add_post("/api/files/{path:.*}", bridge.handle_file_upload) # Moonraker database (OrcaSlicer AMS-Sync) r.add_get("/server/database/item", bridge.handle_moonraker_database) r.add_get("/server/database/list", bridge.handle_database_list) # New API endpoints r.add_post("/api/light", bridge.handle_api_light) r.add_post("/api/fan", bridge.handle_api_fan) r.add_post("/api/connect", bridge.handle_api_connect) r.add_post("/api/disconnect", bridge.handle_api_disconnect) r.add_post("/api/speed", bridge.handle_api_speed) r.add_post("/api/ams/feed", bridge.handle_api_ams_feed) r.add_post("/api/ams/set_slot", bridge.handle_api_ams_set_slot) r.add_post("/api/axis", bridge.handle_api_axis) r.add_post("/api/temperature", bridge.handle_api_temperature) r.add_get("/api/camera", bridge.handle_api_camera) r.add_get("/api/camera/stream", bridge.handle_camera_stream) r.add_get("/api/camera/snapshot", bridge.handle_api_camera_snapshot) r.add_post("/api/camera/start", bridge.handle_api_camera_start) r.add_post("/api/camera/stop", bridge.handle_api_camera_stop) r.add_get("/api/state", bridge.handle_api_state) r.add_get("/api/settings", bridge.handle_api_settings_get) r.add_post("/api/settings", bridge.handle_api_settings_post) r.add_get("/api/update/check", bridge.handle_api_update_check) r.add_post("/api/update/apply", bridge.handle_api_update_apply) r.add_post("/api/file_ready/clear", bridge.handle_api_file_ready_clear) r.add_get("/api/log/stream", bridge.handle_api_log_stream) r.add_get("/api/log/download", bridge.handle_api_log_download) r.add_get("/serve/{filename}", bridge.handle_serve_file) # Root + favicon (OrcaSlicer öffnet / in eingebettetem Browser) r.add_get("/", bridge.handle_index) r.add_get("/favicon.ico", bridge.handle_favicon) # WebSocket r.add_get("/websocket", bridge.handle_websocket) # Catch-all: alle unbekannten Requests loggen statt 404 r.add_route("*", "/{path:.*}", bridge.handle_catchall) return app async def run_bridge(args): client = KobraXClient( host=args.printer_ip, port=args.mqtt_port, username=args.username, password=args.password, mode_id=args.mode_id, device_id=args.device_id, client_id="kobrax_bridge", ) bridge = KobraXBridge(client, args=args) # Verbindungsversuch beim Start – bei Fehler im Offline-Modus weiterlaufen loop = asyncio.get_event_loop() log.info(f"Connecting to printer {args.printer_ip}:{args.mqtt_port} ...") try: await loop.run_in_executor(None, client.connect) log.info("MQTT verbunden") except Exception as e: err = _mqtt_error_msg(e) log.warning(f"Connection failed: {err} - starting in offline mode") bridge._state["print_state"] = "error" bridge._state["kobra_state"] = "offline" bridge._state["connection_error"] = err app = build_app(bridge) stop_event = threading.Event() poll_thread = threading.Thread( target=bridge._poll_loop, args=(stop_event,), daemon=True, name="poll" ) poll_thread.start() runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, args.host, args.port) await site.start() import socket as _socket try: with _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) as _s: _s.connect(("8.8.8.8", 80)) _local_ip = _s.getsockname()[0] except Exception: _local_ip = args.host log.info(f"Bridge running at http://{_local_ip}:{args.port}") log.info(f"OrcaSlicer → Klipper → Host: {_local_ip} Port: {args.port}") log.info("Ctrl-C zum Beenden") try: while True: await asyncio.sleep(3600) except (KeyboardInterrupt, asyncio.CancelledError): pass finally: stop_event.set() await runner.cleanup() client.disconnect() log.info("Bridge stopped") def main(): parser = argparse.ArgumentParser(description="Moonraker-Bridge für Anycubic Kobra X") parser.add_argument("--printer-ip", default=env_loader.PRINTER_IP, help="IP-Adresse des Druckers") parser.add_argument("--mqtt-port", type=int, default=env_loader.MQTT_PORT) parser.add_argument("--username", default=env_loader.USERNAME) parser.add_argument("--password", default=env_loader.PASSWORD) parser.add_argument("--mode-id", default=env_loader.MODE_ID) parser.add_argument("--device-id", default=env_loader.DEVICE_ID) parser.add_argument("--default-ams-slot",default=env_loader.DEFAULT_AMS_SLOT) parser.add_argument("--auto-leveling", type=int, default=env_loader.AUTO_LEVELING) parser.add_argument("--host", default="0.0.0.0", help="Bind-Adresse für den Bridge-Server") parser.add_argument("--port", type=int, default=7125, help="HTTP/WS-Port (Moonraker-Standard: 7125)") args = parser.parse_args() if args.printer_ip and ":" in args.printer_ip: args.printer_ip = args.printer_ip.split(":")[0] # Windows braucht ProactorEventLoop für asyncio.create_subprocess_exec if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) asyncio.run(run_bridge(args)) if __name__ == "__main__": main()