583 lines
21 KiB
Python
583 lines
21 KiB
Python
"""
|
||
kobrax_client.py – Anycubic Kobra X LAN-MQTT-Client
|
||
|
||
Protokoll vollständig rekonstruiert via Sniffer 2026-04-17 (953 Nachrichten).
|
||
|
||
Voraussetzungen:
|
||
- /tmp/anycubic_slicer.crt und .key (aus cloud_mqtt.dll @ 0x2ed5b0 / 0x2edce0)
|
||
- Drucker im LAN-Modus erreichbar auf Port 9883
|
||
|
||
Verwendung:
|
||
client = KobraXClient(env_loader.PRINTER_IP, mode_id=env_loader.MODE_ID,
|
||
device_id=env_loader.DEVICE_ID)
|
||
client.connect()
|
||
info = client.query_info()
|
||
print(info["data"]["temp"])
|
||
client.disconnect()
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import socket
|
||
import ssl
|
||
import sys
|
||
import threading
|
||
import time
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
import env_loader
|
||
|
||
_SCRIPT_DIR = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__))
|
||
CERT_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.crt")
|
||
KEY_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.key")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Low-level MQTT framing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _enc_str(s: str) -> bytes:
|
||
b = s.encode("utf-8")
|
||
return len(b).to_bytes(2, "big") + b
|
||
|
||
|
||
def _enc_len(n: int) -> bytes:
|
||
out = bytearray()
|
||
while True:
|
||
d = n % 128
|
||
n //= 128
|
||
if n > 0:
|
||
d |= 0x80
|
||
out.append(d)
|
||
if n == 0:
|
||
break
|
||
return bytes(out)
|
||
|
||
|
||
def _build_connect(client_id: str, username: str, password: str) -> bytes:
|
||
proto = b"\x00\x04MQTT\x04"
|
||
ka = b"\x00\x3c" # keepalive = 60s
|
||
flags = 0xC2 # username + password, clean session
|
||
payload = _enc_str(client_id) + _enc_str(username) + _enc_str(password)
|
||
body = proto + bytes([flags]) + ka + payload
|
||
return bytes([0x10]) + _enc_len(len(body)) + body
|
||
|
||
|
||
def _build_subscribe(topic: str, pid: int) -> bytes:
|
||
p = pid.to_bytes(2, "big") + _enc_str(topic) + b"\x00"
|
||
return bytes([0x82]) + _enc_len(len(p)) + p
|
||
|
||
|
||
def _build_publish(topic: str, payload: str) -> bytes:
|
||
body = _enc_str(topic) + payload.encode("utf-8")
|
||
return bytes([0x30]) + _enc_len(len(body)) + body
|
||
|
||
|
||
def _build_pingreq() -> bytes:
|
||
return bytes([0xC0, 0x00])
|
||
|
||
|
||
def _parse_publish(pkt: bytes):
|
||
if len(pkt) < 2:
|
||
return None, None
|
||
tlen = (pkt[0] << 8) | pkt[1]
|
||
if 2 + tlen > len(pkt):
|
||
return None, None
|
||
topic = pkt[2:2 + tlen].decode("utf-8", errors="replace")
|
||
payload = pkt[2 + tlen:]
|
||
return topic, payload
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# KobraXClient
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class KobraXClient:
|
||
def __init__(self, host: str, username: str, password: str,
|
||
mode_id: str, device_id: str,
|
||
port: int = 9883, client_id: str = "kobrax_py"):
|
||
self.host = host
|
||
self.port = port
|
||
self.username = username
|
||
self.password = password
|
||
self.mode_id = mode_id
|
||
self.device_id = device_id
|
||
self.client_id = client_id
|
||
|
||
self._sock = None
|
||
self._buf = b""
|
||
self._pid = 1
|
||
self._lock = threading.Lock()
|
||
self._running = False
|
||
|
||
# Pending requests by msgid (for response ACK)
|
||
self._pending_msgid: dict[str, dict] = {}
|
||
# Pending requests by msg_type/report topic suffix
|
||
self._pending_report: dict[str, dict] = {}
|
||
|
||
# Optional callbacks: topic_suffix → callable(payload_dict)
|
||
self.callbacks: dict[str, callable] = {}
|
||
|
||
# -- Topics --------------------------------------------------------------
|
||
|
||
def _pub_topic(self, msg_type: str) -> str:
|
||
return (f"anycubic/anycubicCloud/v1/slicer/printer/"
|
||
f"{self.mode_id}/{self.device_id}/{msg_type}")
|
||
|
||
def _web_topic(self, msg_type: str) -> str:
|
||
return (f"anycubic/anycubicCloud/v1/web/printer/"
|
||
f"{self.mode_id}/{self.device_id}/{msg_type}")
|
||
|
||
def _sub_topic(self) -> str:
|
||
return (f"anycubic/anycubicCloud/v1/printer/public/"
|
||
f"{self.mode_id}/{self.device_id}/#")
|
||
|
||
# -- Connection ----------------------------------------------------------
|
||
|
||
def _do_connect(self):
|
||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
ctx.set_ciphers("DEFAULT:@SECLEVEL=0")
|
||
ctx.load_cert_chain(CERT_FILE, KEY_FILE)
|
||
|
||
raw = socket.create_connection((self.host, self.port), timeout=5)
|
||
self._sock = ctx.wrap_socket(raw)
|
||
print(f"[kobrax] TLS: {self._sock.cipher()[0]}")
|
||
|
||
self._sock.sendall(_build_connect(self.client_id, self.username, self.password))
|
||
self._sock.settimeout(3)
|
||
r = self._sock.recv(64)
|
||
if len(r) < 4 or r[0] != 0x20 or r[3] != 0:
|
||
raise RuntimeError(f"CONNACK failed: {r.hex()}")
|
||
print(f"[kobrax] CONNACK rc=0")
|
||
|
||
self._sock.settimeout(0.2)
|
||
self._buf = b""
|
||
self._subscribe(self._sub_topic())
|
||
|
||
def connect(self):
|
||
self._do_connect()
|
||
self._running = True
|
||
t = threading.Thread(target=self._read_loop, daemon=True)
|
||
t.start()
|
||
time.sleep(0.3)
|
||
|
||
def disconnect(self):
|
||
self._running = False
|
||
try:
|
||
self._sock.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def _reconnect(self):
|
||
print("[kobrax] Verbindung verloren – reconnect…")
|
||
try:
|
||
self._sock.close()
|
||
except Exception:
|
||
pass
|
||
for delay in [2, 4, 8, 15, 30]:
|
||
try:
|
||
self._do_connect()
|
||
print("[kobrax] Reconnect erfolgreich")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[kobrax] Reconnect fehlgeschlagen ({e}), warte {delay}s…")
|
||
time.sleep(delay)
|
||
return False
|
||
|
||
def _subscribe(self, topic: str):
|
||
with self._lock:
|
||
pid = self._pid
|
||
self._pid += 1
|
||
self._sock.sendall(_build_subscribe(topic, pid))
|
||
print(f"[kobrax] SUB {topic}")
|
||
|
||
# -- Read loop -----------------------------------------------------------
|
||
|
||
def _read_loop(self):
|
||
last_ping = time.time()
|
||
_empty_count = 0
|
||
while self._running:
|
||
if time.time() - last_ping > 30:
|
||
with self._lock:
|
||
try:
|
||
self._sock.sendall(_build_pingreq())
|
||
except Exception:
|
||
if self._running and not self._reconnect():
|
||
break
|
||
last_ping = time.time()
|
||
continue
|
||
last_ping = time.time()
|
||
try:
|
||
data = self._sock.recv(65536)
|
||
if not data:
|
||
# Windows SSL kann kurzzeitig b"" liefern ohne echten EOF
|
||
_empty_count += 1
|
||
if _empty_count >= 5:
|
||
raise ConnectionResetError("EOF")
|
||
continue
|
||
_empty_count = 0
|
||
self._buf += data
|
||
self._drain()
|
||
except ssl.SSLWantReadError:
|
||
continue
|
||
except socket.timeout:
|
||
continue
|
||
except Exception as e:
|
||
if self._running:
|
||
print(f"[kobrax] reader error: {e}")
|
||
if not self._reconnect():
|
||
break
|
||
last_ping = time.time()
|
||
else:
|
||
break
|
||
|
||
def _drain(self):
|
||
buf = self._buf
|
||
idx = 0
|
||
while idx < len(buf):
|
||
ptype = buf[idx] & 0xF0
|
||
i = idx + 1
|
||
mul = 1
|
||
rem = 0
|
||
while i < len(buf):
|
||
b = buf[i]
|
||
rem += (b & 0x7F) * mul
|
||
mul *= 128
|
||
i += 1
|
||
if not (b & 0x80):
|
||
break
|
||
if i + rem > len(buf):
|
||
break
|
||
pkt = buf[i:i + rem]
|
||
idx = i + rem
|
||
|
||
if ptype == 0x30:
|
||
topic, raw_payload = _parse_publish(pkt)
|
||
if topic is None:
|
||
continue
|
||
try:
|
||
payload = json.loads(raw_payload)
|
||
except Exception:
|
||
payload = {"_raw": raw_payload.decode("utf-8", errors="replace")}
|
||
self._dispatch(topic, payload)
|
||
|
||
self._buf = buf[idx:]
|
||
|
||
def _dispatch(self, topic: str, payload: dict):
|
||
# Resolve by report topic suffix (e.g. "info/report")
|
||
suffix = "/".join(topic.split("/")[-2:])
|
||
if suffix in self._pending_report:
|
||
entry = self._pending_report[suffix]
|
||
entry["result"] = payload
|
||
entry["event"].set()
|
||
|
||
# Resolve by msgid (for generic response ACK)
|
||
msgid = payload.get("msgid")
|
||
if msgid and msgid in self._pending_msgid:
|
||
entry = self._pending_msgid[msgid]
|
||
entry["result"] = payload
|
||
entry["event"].set()
|
||
|
||
# User callbacks by topic suffix (last two path components)
|
||
suffix = "/".join(topic.split("/")[-2:])
|
||
if suffix in self.callbacks:
|
||
try:
|
||
self.callbacks[suffix](payload)
|
||
except Exception as e:
|
||
print(f"[kobrax] callback error for {suffix}: {e}")
|
||
|
||
# Generic wildcard callback
|
||
if "*" in self.callbacks:
|
||
try:
|
||
self.callbacks["*"](topic, payload)
|
||
except Exception as e:
|
||
print(f"[kobrax] wildcard callback error: {e}")
|
||
|
||
# -- Publish + request/response ------------------------------------------
|
||
|
||
def publish(self, msg_type: str, action: str, data=None, timeout: float = 5.0) -> dict | None:
|
||
msgid = str(uuid.uuid4())
|
||
payload = json.dumps({
|
||
"type": msg_type,
|
||
"action": action,
|
||
"msgid": msgid,
|
||
"timestamp": int(time.time() * 1000),
|
||
"data": data,
|
||
}, separators=(",", ":"))
|
||
|
||
# Wait by msgid only — avoids collisions when multiple threads
|
||
# call publish() for the same msg_type concurrently.
|
||
# Also register by report topic as fallback for responses without msgid.
|
||
report_key = f"{msg_type}/report"
|
||
event = threading.Event()
|
||
entry = {"event": event, "result": None}
|
||
self._pending_msgid[msgid] = entry
|
||
# Only register report-key waiter if nobody else is waiting on it
|
||
report_registered = False
|
||
if report_key not in self._pending_report:
|
||
self._pending_report[report_key] = entry
|
||
report_registered = True
|
||
|
||
topic = self._pub_topic(msg_type)
|
||
try:
|
||
with self._lock:
|
||
self._sock.sendall(_build_publish(topic, payload))
|
||
except Exception as e:
|
||
print(f"[kobrax] send error: {e}, reconnecting…")
|
||
self._pending_msgid.pop(msgid, None)
|
||
if report_registered:
|
||
self._pending_report.pop(report_key, None)
|
||
if not self._reconnect():
|
||
return None
|
||
# retry once after reconnect
|
||
try:
|
||
with self._lock:
|
||
self._sock.sendall(_build_publish(topic, payload))
|
||
self._pending_msgid[msgid] = entry
|
||
if report_registered:
|
||
self._pending_report[report_key] = entry
|
||
except Exception:
|
||
return None
|
||
|
||
if timeout <= 0:
|
||
self._pending_msgid.pop(msgid, None)
|
||
if report_registered:
|
||
self._pending_report.pop(report_key, None)
|
||
return None
|
||
|
||
received = event.wait(timeout)
|
||
self._pending_msgid.pop(msgid, None)
|
||
if report_registered:
|
||
self._pending_report.pop(report_key, None)
|
||
if not received:
|
||
return None
|
||
return entry["result"]
|
||
|
||
def publish_web(self, msg_type: str, action: str, data=None) -> None:
|
||
"""Fire-and-forget publish on the web/printer topic (used for runtime updates during print)."""
|
||
msgid = str(uuid.uuid4())
|
||
payload = json.dumps({
|
||
"type": msg_type,
|
||
"action": action,
|
||
"msgid": msgid,
|
||
"timestamp": int(time.time() * 1000),
|
||
"data": data,
|
||
}, separators=(",", ":"))
|
||
topic = self._web_topic(msg_type)
|
||
try:
|
||
with self._lock:
|
||
self._sock.sendall(_build_publish(topic, payload))
|
||
except Exception as e:
|
||
print(f"[kobrax] web send error: {e}")
|
||
|
||
# -- High-level commands -------------------------------------------------
|
||
|
||
def query_info(self) -> dict | None:
|
||
return self.publish("info", "query")
|
||
|
||
def query_status(self) -> dict | None:
|
||
return self.publish("status", "query")
|
||
|
||
def query_multicolor_box(self) -> dict | None:
|
||
return self.publish("multiColorBox", "getInfo")
|
||
|
||
def set_temperature(self, nozzle: int, bed: int) -> dict | None:
|
||
return self.publish("tempature", "set",
|
||
{"target_nozzle_temp": nozzle, "target_hotbed_temp": bed})
|
||
|
||
def set_fan(self, pct: int) -> dict | None:
|
||
return self.publish("fan", "set", {"fan_speed_pct": pct})
|
||
|
||
def set_light(self, on: bool, brightness: int = 80) -> dict | None:
|
||
return self.publish("light", "control",
|
||
{"type": 2, "status": 1 if on else 0, "brightness": brightness})
|
||
|
||
def start_camera(self) -> dict | None:
|
||
return self.publish("video", "startCapture")
|
||
|
||
def stop_camera(self) -> dict | None:
|
||
return self.publish("video", "stopCapture")
|
||
|
||
def pause_print(self, taskid: str = "-1") -> dict | None:
|
||
return self.publish("print", "pause", {"taskid": taskid})
|
||
|
||
def resume_print(self, taskid: str = "-1") -> dict | None:
|
||
return self.publish("print", "resume", {"taskid": taskid})
|
||
|
||
def stop_print(self, taskid: str = "-1") -> dict | None:
|
||
return self.publish("print", "stop", {"taskid": taskid})
|
||
|
||
# -- G-Code Upload -------------------------------------------------------
|
||
|
||
def upload_gcode(self, filepath: str, remote_filename: str | None = None,
|
||
upload_url: str | None = None) -> dict:
|
||
"""Upload a G-Code or .3mf file via HTTP POST to port 18910.
|
||
|
||
Returns the parsed JSON response from the printer.
|
||
Raises RuntimeError on HTTP or connection errors.
|
||
|
||
Protocol captured via Wireshark 2026-04-18:
|
||
POST /gcode_upload?s={session_token}
|
||
Multipart fields: 'filename' (text) + 'gcode' (file bytes)
|
||
Required headers: X-File-Length, X-BBL-* (BambuLab heritage)
|
||
"""
|
||
if not upload_url:
|
||
info = self.query_info()
|
||
if not info:
|
||
raise RuntimeError("Could not get info/report for upload URL")
|
||
upload_url = info["data"]["urls"]["fileUploadurl"]
|
||
# parse token from URL query string
|
||
token = upload_url.split("?s=")[1] if "?s=" in upload_url else ""
|
||
|
||
with open(filepath, "rb") as f:
|
||
file_data = f.read()
|
||
|
||
if remote_filename is None:
|
||
remote_filename = os.path.basename(filepath)
|
||
|
||
boundary = "------------------------a3a050b927d92a4c"
|
||
sep = f"--{boundary}\r\n".encode()
|
||
end = f"--{boundary}--\r\n".encode()
|
||
|
||
part_filename = (
|
||
sep +
|
||
f'Content-Disposition: form-data; name="filename"\r\n\r\n'.encode() +
|
||
remote_filename.encode() + b"\r\n"
|
||
)
|
||
part_gcode = (
|
||
sep +
|
||
f'Content-Disposition: form-data; name="gcode"; filename="{remote_filename}"\r\n'
|
||
f'Content-Type: application/octet-stream\r\n\r\n'.encode() +
|
||
file_data + b"\r\n"
|
||
)
|
||
body = part_filename + part_gcode + end
|
||
|
||
headers = (
|
||
f"POST /gcode_upload?s={token} HTTP/1.1\r\n"
|
||
f"Host: {self.host}:18910\r\n"
|
||
f"User-Agent: AnycubicSlicerNext/1.3.9.4\r\n"
|
||
f"Accept: */*\r\n"
|
||
f"X-BBL-Client-Name: AnycubicSlicerNext\r\n"
|
||
f"X-BBL-Client-Type: slicer\r\n"
|
||
f"X-BBL-Client-Version: 01.03.09.04\r\n"
|
||
f"X-BBL-Device-ID: {str(uuid.uuid4())}\r\n"
|
||
f"X-BBL-Language: de-DE\r\n"
|
||
f"X-BBL-OS-Type: windows\r\n"
|
||
f"X-BBL-OS-Version: 10.0.26200\r\n"
|
||
f"X-File-Length: {len(file_data)}\r\n"
|
||
f"Content-Type: multipart/form-data; boundary={boundary}\r\n"
|
||
f"Content-Length: {len(body)}\r\n"
|
||
f"Connection: close\r\n\r\n"
|
||
).encode()
|
||
|
||
sock = socket.create_connection((self.host, 18910), timeout=30)
|
||
sock.sendall(headers + body)
|
||
sock.settimeout(10)
|
||
response = b""
|
||
try:
|
||
while True:
|
||
chunk = sock.recv(65536)
|
||
if not chunk:
|
||
break
|
||
response += chunk
|
||
except socket.timeout:
|
||
pass
|
||
sock.close()
|
||
|
||
# parse HTTP response body
|
||
if b"\r\n\r\n" in response:
|
||
body_start = response.index(b"\r\n\r\n") + 4
|
||
resp_body = response[body_start:]
|
||
else:
|
||
resp_body = response
|
||
try:
|
||
return json.loads(resp_body)
|
||
except Exception:
|
||
raise RuntimeError(f"Upload: unerwartete Antwort: {resp_body[:200]}")
|
||
|
||
def move_axis(self, axis: int, move_type: int = 2, distance: int = 0) -> dict | None:
|
||
return self.publish("axis", "move",
|
||
{"axis": axis, "move_type": move_type, "distance": distance})
|
||
|
||
def home_all(self) -> dict | None:
|
||
# axis=4 move_type=2 = Home all axes (~4-15s)
|
||
return self.publish("axis", "move", {"axis": 4, "move_type": 2, "distance": 0}, timeout=30.0)
|
||
|
||
def home_axis(self, axis: int) -> dict | None:
|
||
# axis: 1=Y, 2=X, 3=Z
|
||
return self.publish("axis", "move", {"axis": axis, "move_type": 2, "distance": 0}, timeout=30.0)
|
||
|
||
def jog(self, axis: int, direction: int, distance_mm: int = 1) -> dict | None:
|
||
# axis: 1=Y, 2=X, 3=Z direction: 0=neg, 1=pos
|
||
return self.move_axis(axis=axis, move_type=direction, distance=distance_mm)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI Demo
|
||
# ---------------------------------------------------------------------------
|
||
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="Anycubic Kobra X LAN-Client")
|
||
parser.add_argument("--ip", default=env_loader.PRINTER_IP)
|
||
parser.add_argument("--port", type=int, default=env_loader.MQTT_PORT)
|
||
parser.add_argument("--username", default=env_loader.USERNAME)
|
||
parser.add_argument("--password", default=env_loader.PASSWORD)
|
||
parser.add_argument("--mode-id", default=env_loader.MODE_ID)
|
||
parser.add_argument("--device-id", default=env_loader.DEVICE_ID)
|
||
parser.add_argument("--monitor", action="store_true",
|
||
help="Dauerhaft mithören und alle Reports ausgeben")
|
||
args = parser.parse_args()
|
||
|
||
client = KobraXClient(
|
||
host=args.ip, port=args.port,
|
||
username=args.username, password=args.password,
|
||
mode_id=args.mode_id, device_id=args.device_id,
|
||
)
|
||
|
||
if args.monitor:
|
||
def on_msg(topic, payload):
|
||
suffix = "/".join(topic.split("/")[-2:])
|
||
ts = datetime.now().strftime("%H:%M:%S")
|
||
state = payload.get("state", "")
|
||
data = payload.get("data") or {}
|
||
if "progress" in data:
|
||
print(f"[{ts}] {suffix:25} state={state:12} progress={data['progress']}% layer={data.get('curr_layer','?')}/{data.get('total_layers','?')}")
|
||
elif "curr_nozzle_temp" in data:
|
||
print(f"[{ts}] {suffix:25} nozzle={data['curr_nozzle_temp']}°C/{data.get('target_nozzle_temp',0)}°C bed={data['curr_hotbed_temp']}°C/{data.get('target_hotbed_temp',0)}°C")
|
||
else:
|
||
print(f"[{ts}] {suffix:25} state={state}")
|
||
|
||
client.callbacks["*"] = on_msg
|
||
client.connect()
|
||
print("[kobrax] Monitor-Modus aktiv (Ctrl-C zum Beenden)")
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
client.disconnect()
|
||
else:
|
||
client.connect()
|
||
|
||
print("\n--- query_info ---")
|
||
info = client.query_info()
|
||
if info:
|
||
d = info.get("data", {})
|
||
print(f" Drucker: {d.get('printerName')} FW {d.get('version')}")
|
||
print(f" Status: {d.get('state')}")
|
||
t = d.get("temp", {})
|
||
print(f" Nozzle: {t.get('curr_nozzle_temp')}°C → {t.get('target_nozzle_temp')}°C")
|
||
print(f" Bett: {t.get('curr_hotbed_temp')}°C → {t.get('target_hotbed_temp')}°C")
|
||
urls = d.get("urls", {})
|
||
print(f" Upload: {urls.get('fileUploadurl')}")
|
||
print(f" Kamera: {urls.get('rtspUrl')}")
|
||
else:
|
||
print(" Keine Antwort")
|
||
|
||
client.disconnect()
|