""" kobrax_client.py – Anycubic Kobra X LAN-MQTT-Client Protokoll vollständig rekonstruiert via Sniffer 2026-04-17 (953 Nachrichten). Voraussetzungen: - /tmp/anycubic_slicer.crt und .key (aus cloud_mqtt.dll @ 0x2ed5b0 / 0x2edce0) - Drucker im LAN-Modus erreichbar auf Port 9883 Verwendung: client = KobraXClient(env_loader.PRINTER_IP, mode_id=env_loader.MODE_ID, device_id=env_loader.DEVICE_ID) client.connect() info = client.query_info() print(info["data"]["temp"]) client.disconnect() ──────────────────────────────────────────────────────────────────────────── Copyright (C) 2026 viewit (KX-Bridge contributors) Licensed under GPLv3 — see LICENSE in the project root. Protocol reverse-engineered for interoperability (§69e UrhG / EU Software Directive Art. 6). Not affiliated with Anycubic. See NOTICE.md. """ import hashlib import json import logging import os import socket import ssl import sys import threading import time import uuid from datetime import datetime import env_loader log = logging.getLogger("kobrax.mqtt") _SCRIPT_DIR = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__)) CERT_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.crt") KEY_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.key") # --------------------------------------------------------------------------- # Low-level MQTT framing # --------------------------------------------------------------------------- def _enc_str(s: str) -> bytes: b = s.encode("utf-8") return len(b).to_bytes(2, "big") + b def _enc_len(n: int) -> bytes: out = bytearray() while True: d = n % 128 n //= 128 if n > 0: d |= 0x80 out.append(d) if n == 0: break return bytes(out) def _build_connect(client_id: str, username: str, password: str) -> bytes: proto = b"\x00\x04MQTT\x04" ka = b"\x00\x3c" # keepalive = 60s flags = 0xC2 # username + password, clean session payload = _enc_str(client_id) + _enc_str(username) + _enc_str(password) body = proto + bytes([flags]) + ka + payload return bytes([0x10]) + _enc_len(len(body)) + body def _build_subscribe(topic: str, pid: int) -> bytes: p = pid.to_bytes(2, "big") + _enc_str(topic) + b"\x00" return bytes([0x82]) + _enc_len(len(p)) + p def _build_publish(topic: str, payload: str) -> bytes: body = _enc_str(topic) + payload.encode("utf-8") return bytes([0x30]) + _enc_len(len(body)) + body def _build_pingreq() -> bytes: return bytes([0xC0, 0x00]) def _parse_publish(pkt: bytes): if len(pkt) < 2: return None, None tlen = (pkt[0] << 8) | pkt[1] if 2 + tlen > len(pkt): return None, None topic = pkt[2:2 + tlen].decode("utf-8", errors="replace") payload = pkt[2 + tlen:] return topic, payload # --------------------------------------------------------------------------- # KobraXClient # --------------------------------------------------------------------------- class KobraXClient: def __init__(self, host: str, username: str, password: str, mode_id: str, device_id: str, port: int = 9883, client_id: str = "kobrax_py"): self.host = host self.port = port self.username = username self.password = password self.mode_id = mode_id self.device_id = device_id self.client_id = client_id self._sock = None self._buf = b"" self._pid = 1 self._lock = threading.Lock() self._running = False # Pending requests by msgid (for response ACK) self._pending_msgid: dict[str, dict] = {} # Pending requests by msg_type/report topic suffix self._pending_report: dict[str, dict] = {} # Optional callbacks: topic_suffix → callable(payload_dict) self.callbacks: dict[str, callable] = {} # Dedup: last hash per topic suffix to suppress repeated identical messages self._last_rx_hash: dict[str, str] = {} # Fields that change every tick and should be stripped before dedup-hashing _VOLATILE = {"timestamp", "msgid", "progress", "curr_layer", "curr_nozzle_temp", "curr_hotbed_temp", "target_nozzle_temp", "target_hotbed_temp"} # -- Topics -------------------------------------------------------------- def _pub_topic(self, msg_type: str) -> str: return (f"anycubic/anycubicCloud/v1/slicer/printer/" f"{self.mode_id}/{self.device_id}/{msg_type}") def _web_topic(self, msg_type: str) -> str: return (f"anycubic/anycubicCloud/v1/web/printer/" f"{self.mode_id}/{self.device_id}/{msg_type}") def _sub_topic(self) -> str: return (f"anycubic/anycubicCloud/v1/printer/public/" f"{self.mode_id}/{self.device_id}/#") # -- Connection ---------------------------------------------------------- def _do_connect(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE ctx.set_ciphers("DEFAULT:@SECLEVEL=0") ctx.load_cert_chain(CERT_FILE, KEY_FILE) raw = socket.create_connection((self.host, self.port), timeout=5) self._sock = ctx.wrap_socket(raw) log.info("TLS connected cipher=%s", self._sock.cipher()[0]) self._sock.sendall(_build_connect(self.client_id, self.username, self.password)) self._sock.settimeout(3) r = self._sock.recv(64) if len(r) < 4 or r[0] != 0x20 or r[3] != 0: raise RuntimeError(f"CONNACK failed: {r.hex()}") log.info("CONNACK rc=0") self._sock.settimeout(0.2) self._buf = b"" self._subscribe(self._sub_topic()) log.debug("MQTT connected to %s:%s", self.host, self.port) def connect(self): self._do_connect() self._running = True t = threading.Thread(target=self._read_loop, daemon=True) t.start() time.sleep(0.3) def disconnect(self): self._running = False try: self._sock.close() except Exception: pass def _reconnect(self): log.warning("Verbindung verloren – reconnect…") try: self._sock.close() except Exception: pass for delay in [2, 4, 8, 15, 30]: try: self._do_connect() log.info("Reconnect erfolgreich") return True except Exception as e: log.warning("Reconnect fehlgeschlagen (%s), warte %ss…", e, delay) time.sleep(delay) return False def _subscribe(self, topic: str): with self._lock: pid = self._pid self._pid += 1 self._sock.sendall(_build_subscribe(topic, pid)) log.info("SUB %s", topic) # -- Read loop ----------------------------------------------------------- def _read_loop(self): last_ping = time.time() _empty_count = 0 while self._running: if time.time() - last_ping > 30: with self._lock: try: self._sock.sendall(_build_pingreq()) except Exception: if self._running and not self._reconnect(): break last_ping = time.time() continue last_ping = time.time() try: data = self._sock.recv(65536) if not data: # Windows SSL kann kurzzeitig b"" liefern ohne echten EOF _empty_count += 1 if _empty_count >= 5: raise ConnectionResetError("EOF") continue _empty_count = 0 self._buf += data self._drain() except ssl.SSLWantReadError: continue except socket.timeout: continue except Exception as e: if self._running: log.warning("reader error: %s", e) if not self._reconnect(): break last_ping = time.time() else: break def _drain(self): buf = self._buf idx = 0 while idx < len(buf): ptype = buf[idx] & 0xF0 i = idx + 1 mul = 1 rem = 0 while i < len(buf): b = buf[i] rem += (b & 0x7F) * mul mul *= 128 i += 1 if not (b & 0x80): break if i + rem > len(buf): break pkt = buf[i:i + rem] idx = i + rem if ptype == 0x30: topic, raw_payload = _parse_publish(pkt) if topic is None: continue try: payload = json.loads(raw_payload) except Exception: payload = {"_raw": raw_payload.decode("utf-8", errors="replace")} self._dispatch(topic, payload) self._buf = buf[idx:] def _dedup_hash(self, suffix: str, payload: dict) -> str: """Hash payload ignoring volatile per-tick fields for dedup check.""" stable = {k: v for k, v in payload.items() if k not in {"timestamp", "msgid", "progress", "curr_layer", "curr_nozzle_temp", "curr_hotbed_temp", "target_nozzle_temp", "target_hotbed_temp"}} return hashlib.md5(json.dumps(stable, sort_keys=True).encode(), usedforsecurity=False).hexdigest() def _dispatch(self, topic: str, payload: dict): suffix = "/".join(topic.split("/")[-2:]) # Structured RX log with dedup suppression h = self._dedup_hash(suffix, payload) is_dup = self._last_rx_hash.get(suffix) == h self._last_rx_hash[suffix] = h if is_dup: log.debug("RX [dup] %-25s state=%-12s", suffix, payload.get("state", "")) else: data = payload.get("data") or {} state = payload.get("state", "") if "progress" in data: log.info("RX %-25s state=%-12s progress=%s%% layer=%s/%s", suffix, state, data["progress"], data.get("curr_layer", "?"), data.get("total_layers", "?")) elif "curr_nozzle_temp" in data: log.info("RX %-25s nozzle=%s°C/%s°C bed=%s°C/%s°C", suffix, data["curr_nozzle_temp"], data.get("target_nozzle_temp", 0), data.get("curr_hotbed_temp", "?"), data.get("target_hotbed_temp", 0)) else: log.info("RX %-25s state=%-12s data=%s", suffix, state, json.dumps(payload.get("data"), ensure_ascii=False)) # Resolve by report topic suffix (e.g. "info/report") if suffix in self._pending_report: entry = self._pending_report[suffix] entry["result"] = payload entry["event"].set() # Resolve by msgid (for generic response ACK) msgid = payload.get("msgid") if msgid and msgid in self._pending_msgid: entry = self._pending_msgid[msgid] entry["result"] = payload entry["event"].set() # User callbacks by topic suffix (last two path components) if suffix in self.callbacks: try: self.callbacks[suffix](payload) except Exception as e: log.error("callback error for %s: %s", suffix, e) # Generic wildcard callback if "*" in self.callbacks: try: self.callbacks["*"](topic, payload) except Exception as e: log.error("wildcard callback error: %s", e) # -- Publish + request/response ------------------------------------------ def publish(self, msg_type: str, action: str, data=None, timeout: float = 5.0) -> dict | None: msgid = str(uuid.uuid4()) payload = json.dumps({ "type": msg_type, "action": action, "msgid": msgid, "timestamp": int(time.time() * 1000), "data": data, }, separators=(",", ":")) # Wait by msgid only — avoids collisions when multiple threads # call publish() for the same msg_type concurrently. # Also register by report topic as fallback for responses without msgid. report_key = f"{msg_type}/report" event = threading.Event() entry = {"event": event, "result": None} self._pending_msgid[msgid] = entry # Only register report-key waiter if nobody else is waiting on it report_registered = False if report_key not in self._pending_report: self._pending_report[report_key] = entry report_registered = True topic = self._pub_topic(msg_type) log.info("TX %-25s action=%-12s data=%s", f"{msg_type}/request", action, json.dumps(data, ensure_ascii=False) if data else "null") try: with self._lock: self._sock.sendall(_build_publish(topic, payload)) except Exception as e: log.error("send error: %s, reconnecting…", e) self._pending_msgid.pop(msgid, None) if report_registered: self._pending_report.pop(report_key, None) if not self._reconnect(): return None # retry once after reconnect try: with self._lock: self._sock.sendall(_build_publish(topic, payload)) self._pending_msgid[msgid] = entry if report_registered: self._pending_report[report_key] = entry except Exception: return None if timeout <= 0: self._pending_msgid.pop(msgid, None) if report_registered: self._pending_report.pop(report_key, None) return None received = event.wait(timeout) self._pending_msgid.pop(msgid, None) if report_registered: self._pending_report.pop(report_key, None) if not received: return None return entry["result"] def publish_web(self, msg_type: str, action: str, data=None) -> None: """Fire-and-forget publish on the web/printer topic (used for runtime updates during print).""" msgid = str(uuid.uuid4()) payload = json.dumps({ "type": msg_type, "action": action, "msgid": msgid, "timestamp": int(time.time() * 1000), "data": data, }, separators=(",", ":")) topic = self._web_topic(msg_type) log.info("TX(web) %-23s action=%-12s data=%s", f"{msg_type}/request", action, json.dumps(data, ensure_ascii=False) if data else "null") try: with self._lock: self._sock.sendall(_build_publish(topic, payload)) except Exception as e: log.error("web send error: %s", e) # -- High-level commands ------------------------------------------------- def query_info(self) -> dict | None: return self.publish("info", "query") def query_status(self) -> dict | None: return self.publish("status", "query") def query_multicolor_box(self) -> dict | None: return self.publish("multiColorBox", "getInfo") def set_temperature(self, nozzle: int, bed: int) -> dict | None: return self.publish("tempature", "set", {"target_nozzle_temp": nozzle, "target_hotbed_temp": bed}) def set_fan(self, pct: int) -> dict | None: return self.publish("fan", "set", {"fan_speed_pct": pct}) def set_light(self, on: bool, brightness: int = 80) -> dict | None: return self.publish("light", "control", {"type": 2, "status": 1 if on else 0, "brightness": brightness}) def start_camera(self) -> dict | None: return self.publish("video", "startCapture") def stop_camera(self) -> dict | None: return self.publish("video", "stopCapture") def pause_print(self, taskid: str = "-1") -> dict | None: return self.publish("print", "pause", {"taskid": taskid}) def resume_print(self, taskid: str = "-1") -> dict | None: return self.publish("print", "resume", {"taskid": taskid}) def stop_print(self, taskid: str = "-1") -> dict | None: return self.publish("print", "stop", {"taskid": taskid}) # -- Part-Skip ("Exclude Object") --------------------------------------- def query_skip_objects(self) -> dict | None: """Fragt den Drucker nach der aktuellen Objekt-/Skip-Liste.""" return self.publish("skip", "query_obj") def skip_objects(self, names: list[str]) -> dict | None: """Überspringt die genannten Objekte – auch mid-print möglich. Namen entsprechen den EXCLUDE_OBJECT_DEFINE NAME=… Einträgen im GCode-Header bzw. file_details.objects_skip_parts. """ return self.publish("skip", "start", {"objects_skip_parts": list(names)}) # -- G-Code Upload ------------------------------------------------------- def upload_gcode(self, filepath: str, remote_filename: str | None = None, upload_url: str | None = None) -> dict: """Upload a G-Code or .3mf file via HTTP POST to port 18910. Returns the parsed JSON response from the printer. Raises RuntimeError on HTTP or connection errors. Protocol captured via Wireshark 2026-04-18: POST /gcode_upload?s={session_token} Multipart fields: 'filename' (text) + 'gcode' (file bytes) Required headers: X-File-Length, X-BBL-* (BambuLab heritage) """ if not upload_url: info = self.query_info() if not info: raise RuntimeError("Could not get info/report for upload URL") upload_url = info["data"]["urls"]["fileUploadurl"] # parse token from URL query string token = upload_url.split("?s=")[1] if "?s=" in upload_url else "" with open(filepath, "rb") as f: file_data = f.read() if remote_filename is None: remote_filename = os.path.basename(filepath) boundary = "------------------------a3a050b927d92a4c" sep = f"--{boundary}\r\n".encode() end = f"--{boundary}--\r\n".encode() part_filename = ( sep + f'Content-Disposition: form-data; name="filename"\r\n\r\n'.encode() + remote_filename.encode() + b"\r\n" ) part_gcode = ( sep + f'Content-Disposition: form-data; name="gcode"; filename="{remote_filename}"\r\n' f'Content-Type: application/octet-stream\r\n\r\n'.encode() + file_data + b"\r\n" ) body = part_filename + part_gcode + end headers = ( f"POST /gcode_upload?s={token} HTTP/1.1\r\n" f"Host: {self.host}:18910\r\n" f"User-Agent: AnycubicSlicerNext/1.3.9.4\r\n" f"Accept: */*\r\n" f"X-BBL-Client-Name: AnycubicSlicerNext\r\n" f"X-BBL-Client-Type: slicer\r\n" f"X-BBL-Client-Version: 01.03.09.04\r\n" f"X-BBL-Device-ID: {str(uuid.uuid4())}\r\n" f"X-BBL-Language: de-DE\r\n" f"X-BBL-OS-Type: windows\r\n" f"X-BBL-OS-Version: 10.0.26200\r\n" f"X-File-Length: {len(file_data)}\r\n" f"Content-Type: multipart/form-data; boundary={boundary}\r\n" f"Content-Length: {len(body)}\r\n" f"Connection: close\r\n\r\n" ).encode() sock = socket.create_connection((self.host, 18910), timeout=30) sock.sendall(headers + body) sock.settimeout(120) # große GCode-Dateien brauchen Zeit bis der Drucker antwortet response = b"" try: while True: chunk = sock.recv(65536) if not chunk: break response += chunk except socket.timeout: pass sock.close() # parse HTTP response body if b"\r\n\r\n" in response: body_start = response.index(b"\r\n\r\n") + 4 resp_body = response[body_start:] else: resp_body = response try: return json.loads(resp_body) except Exception: raise RuntimeError(f"Upload: unerwartete Antwort: {resp_body[:200]}") def move_axis(self, axis: int, move_type: int = 2, distance: int = 0) -> dict | None: return self.publish("axis", "move", {"axis": axis, "move_type": move_type, "distance": distance}) def home_all(self) -> dict | None: # axis=4 move_type=2 = Home all axes (~4-15s) return self.publish("axis", "move", {"axis": 4, "move_type": 2, "distance": 0}, timeout=30.0) def home_axis(self, axis: int) -> dict | None: # axis: 1=Y, 2=X, 3=Z return self.publish("axis", "move", {"axis": axis, "move_type": 2, "distance": 0}, timeout=30.0) def jog(self, axis: int, direction: int, distance_mm: int = 1) -> dict | None: # axis: 1=Y, 2=X, 3=Z direction: 0=neg, 1=pos return self.move_axis(axis=axis, move_type=direction, distance=distance_mm) # --------------------------------------------------------------------------- # CLI Demo # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Anycubic Kobra X LAN-Client") parser.add_argument("--ip", default=env_loader.PRINTER_IP) parser.add_argument("--port", type=int, default=env_loader.MQTT_PORT) parser.add_argument("--username", default=env_loader.USERNAME) parser.add_argument("--password", default=env_loader.PASSWORD) parser.add_argument("--mode-id", default=env_loader.MODE_ID) parser.add_argument("--device-id", default=env_loader.DEVICE_ID) parser.add_argument("--monitor", action="store_true", help="Dauerhaft mithören und alle Reports ausgeben") args = parser.parse_args() client = KobraXClient( host=args.ip, port=args.port, username=args.username, password=args.password, mode_id=args.mode_id, device_id=args.device_id, ) if args.monitor: def on_msg(topic, payload): suffix = "/".join(topic.split("/")[-2:]) ts = datetime.now().strftime("%H:%M:%S") state = payload.get("state", "") data = payload.get("data") or {} if "progress" in data: print(f"[{ts}] {suffix:25} state={state:12} progress={data['progress']}% layer={data.get('curr_layer','?')}/{data.get('total_layers','?')}") elif "curr_nozzle_temp" in data: print(f"[{ts}] {suffix:25} nozzle={data['curr_nozzle_temp']}°C/{data.get('target_nozzle_temp',0)}°C bed={data['curr_hotbed_temp']}°C/{data.get('target_hotbed_temp',0)}°C") else: print(f"[{ts}] {suffix:25} state={state}") client.callbacks["*"] = on_msg client.connect() print("[kobrax] Monitor-Modus aktiv (Ctrl-C zum Beenden)") try: while True: time.sleep(1) except KeyboardInterrupt: pass client.disconnect() else: client.connect() print("\n--- query_info ---") info = client.query_info() if info: d = info.get("data", {}) print(f" Drucker: {d.get('printerName')} FW {d.get('version')}") print(f" Status: {d.get('state')}") t = d.get("temp", {}) print(f" Nozzle: {t.get('curr_nozzle_temp')}°C → {t.get('target_nozzle_temp')}°C") print(f" Bett: {t.get('curr_hotbed_temp')}°C → {t.get('target_hotbed_temp')}°C") urls = d.get("urls", {}) print(f" Upload: {urls.get('fileUploadurl')}") print(f" Kamera: {urls.get('rtspUrl')}") else: print(" Keine Antwort") client.disconnect()