Files
KX-Bridge-Release/kobrax_client.py
2026-05-01 11:24:08 +02:00

631 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
kobrax_client.py Anycubic Kobra X LAN-MQTT-Client
Protokoll vollständig rekonstruiert via Sniffer 2026-04-17 (953 Nachrichten).
Voraussetzungen:
- /tmp/anycubic_slicer.crt und .key (aus cloud_mqtt.dll @ 0x2ed5b0 / 0x2edce0)
- Drucker im LAN-Modus erreichbar auf Port 9883
Verwendung:
client = KobraXClient(env_loader.PRINTER_IP, mode_id=env_loader.MODE_ID,
device_id=env_loader.DEVICE_ID)
client.connect()
info = client.query_info()
print(info["data"]["temp"])
client.disconnect()
"""
import hashlib
import json
import logging
import os
import socket
import ssl
import sys
import threading
import time
import uuid
from datetime import datetime
import env_loader
log = logging.getLogger("kobrax.mqtt")
_SCRIPT_DIR = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__))
CERT_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.crt")
KEY_FILE = os.path.join(_SCRIPT_DIR, "anycubic_slicer.key")
# ---------------------------------------------------------------------------
# Low-level MQTT framing
# ---------------------------------------------------------------------------
def _enc_str(s: str) -> bytes:
b = s.encode("utf-8")
return len(b).to_bytes(2, "big") + b
def _enc_len(n: int) -> bytes:
out = bytearray()
while True:
d = n % 128
n //= 128
if n > 0:
d |= 0x80
out.append(d)
if n == 0:
break
return bytes(out)
def _build_connect(client_id: str, username: str, password: str) -> bytes:
proto = b"\x00\x04MQTT\x04"
ka = b"\x00\x3c" # keepalive = 60s
flags = 0xC2 # username + password, clean session
payload = _enc_str(client_id) + _enc_str(username) + _enc_str(password)
body = proto + bytes([flags]) + ka + payload
return bytes([0x10]) + _enc_len(len(body)) + body
def _build_subscribe(topic: str, pid: int) -> bytes:
p = pid.to_bytes(2, "big") + _enc_str(topic) + b"\x00"
return bytes([0x82]) + _enc_len(len(p)) + p
def _build_publish(topic: str, payload: str) -> bytes:
body = _enc_str(topic) + payload.encode("utf-8")
return bytes([0x30]) + _enc_len(len(body)) + body
def _build_pingreq() -> bytes:
return bytes([0xC0, 0x00])
def _parse_publish(pkt: bytes):
if len(pkt) < 2:
return None, None
tlen = (pkt[0] << 8) | pkt[1]
if 2 + tlen > len(pkt):
return None, None
topic = pkt[2:2 + tlen].decode("utf-8", errors="replace")
payload = pkt[2 + tlen:]
return topic, payload
# ---------------------------------------------------------------------------
# KobraXClient
# ---------------------------------------------------------------------------
class KobraXClient:
def __init__(self, host: str, username: str, password: str,
mode_id: str, device_id: str,
port: int = 9883, client_id: str = "kobrax_py"):
self.host = host
self.port = port
self.username = username
self.password = password
self.mode_id = mode_id
self.device_id = device_id
self.client_id = client_id
self._sock = None
self._buf = b""
self._pid = 1
self._lock = threading.Lock()
self._running = False
# Pending requests by msgid (for response ACK)
self._pending_msgid: dict[str, dict] = {}
# Pending requests by msg_type/report topic suffix
self._pending_report: dict[str, dict] = {}
# Optional callbacks: topic_suffix → callable(payload_dict)
self.callbacks: dict[str, callable] = {}
# Dedup: last hash per topic suffix to suppress repeated identical messages
self._last_rx_hash: dict[str, str] = {}
# Fields that change every tick and should be stripped before dedup-hashing
_VOLATILE = {"timestamp", "msgid", "progress", "curr_layer",
"curr_nozzle_temp", "curr_hotbed_temp",
"target_nozzle_temp", "target_hotbed_temp"}
# -- Topics --------------------------------------------------------------
def _pub_topic(self, msg_type: str) -> str:
return (f"anycubic/anycubicCloud/v1/slicer/printer/"
f"{self.mode_id}/{self.device_id}/{msg_type}")
def _web_topic(self, msg_type: str) -> str:
return (f"anycubic/anycubicCloud/v1/web/printer/"
f"{self.mode_id}/{self.device_id}/{msg_type}")
def _sub_topic(self) -> str:
return (f"anycubic/anycubicCloud/v1/printer/public/"
f"{self.mode_id}/{self.device_id}/#")
# -- Connection ----------------------------------------------------------
def _do_connect(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_ciphers("DEFAULT:@SECLEVEL=0")
ctx.load_cert_chain(CERT_FILE, KEY_FILE)
raw = socket.create_connection((self.host, self.port), timeout=5)
self._sock = ctx.wrap_socket(raw)
log.info("TLS connected cipher=%s", self._sock.cipher()[0])
self._sock.sendall(_build_connect(self.client_id, self.username, self.password))
self._sock.settimeout(3)
r = self._sock.recv(64)
if len(r) < 4 or r[0] != 0x20 or r[3] != 0:
raise RuntimeError(f"CONNACK failed: {r.hex()}")
log.info("CONNACK rc=0")
self._sock.settimeout(0.2)
self._buf = b""
self._subscribe(self._sub_topic())
log.debug("MQTT connected to %s:%s", self.host, self.port)
def connect(self):
self._do_connect()
self._running = True
t = threading.Thread(target=self._read_loop, daemon=True)
t.start()
time.sleep(0.3)
def disconnect(self):
self._running = False
try:
self._sock.close()
except Exception:
pass
def _reconnect(self):
log.warning("Verbindung verloren reconnect…")
try:
self._sock.close()
except Exception:
pass
for delay in [2, 4, 8, 15, 30]:
try:
self._do_connect()
log.info("Reconnect erfolgreich")
return True
except Exception as e:
log.warning("Reconnect fehlgeschlagen (%s), warte %ss…", e, delay)
time.sleep(delay)
return False
def _subscribe(self, topic: str):
with self._lock:
pid = self._pid
self._pid += 1
self._sock.sendall(_build_subscribe(topic, pid))
log.info("SUB %s", topic)
# -- Read loop -----------------------------------------------------------
def _read_loop(self):
last_ping = time.time()
_empty_count = 0
while self._running:
if time.time() - last_ping > 30:
with self._lock:
try:
self._sock.sendall(_build_pingreq())
except Exception:
if self._running and not self._reconnect():
break
last_ping = time.time()
continue
last_ping = time.time()
try:
data = self._sock.recv(65536)
if not data:
# Windows SSL kann kurzzeitig b"" liefern ohne echten EOF
_empty_count += 1
if _empty_count >= 5:
raise ConnectionResetError("EOF")
continue
_empty_count = 0
self._buf += data
self._drain()
except ssl.SSLWantReadError:
continue
except socket.timeout:
continue
except Exception as e:
if self._running:
log.warning("reader error: %s", e)
if not self._reconnect():
break
last_ping = time.time()
else:
break
def _drain(self):
buf = self._buf
idx = 0
while idx < len(buf):
ptype = buf[idx] & 0xF0
i = idx + 1
mul = 1
rem = 0
while i < len(buf):
b = buf[i]
rem += (b & 0x7F) * mul
mul *= 128
i += 1
if not (b & 0x80):
break
if i + rem > len(buf):
break
pkt = buf[i:i + rem]
idx = i + rem
if ptype == 0x30:
topic, raw_payload = _parse_publish(pkt)
if topic is None:
continue
try:
payload = json.loads(raw_payload)
except Exception:
payload = {"_raw": raw_payload.decode("utf-8", errors="replace")}
self._dispatch(topic, payload)
self._buf = buf[idx:]
def _dedup_hash(self, suffix: str, payload: dict) -> str:
"""Hash payload ignoring volatile per-tick fields for dedup check."""
stable = {k: v for k, v in payload.items()
if k not in {"timestamp", "msgid", "progress", "curr_layer",
"curr_nozzle_temp", "curr_hotbed_temp",
"target_nozzle_temp", "target_hotbed_temp"}}
return hashlib.md5(json.dumps(stable, sort_keys=True).encode(), usedforsecurity=False).hexdigest()
def _dispatch(self, topic: str, payload: dict):
suffix = "/".join(topic.split("/")[-2:])
# Structured RX log with dedup suppression
h = self._dedup_hash(suffix, payload)
is_dup = self._last_rx_hash.get(suffix) == h
self._last_rx_hash[suffix] = h
if is_dup:
log.debug("RX [dup] %-25s state=%-12s", suffix, payload.get("state", ""))
else:
data = payload.get("data") or {}
state = payload.get("state", "")
if "progress" in data:
log.info("RX %-25s state=%-12s progress=%s%% layer=%s/%s",
suffix, state, data["progress"],
data.get("curr_layer", "?"), data.get("total_layers", "?"))
elif "curr_nozzle_temp" in data:
log.info("RX %-25s nozzle=%s°C/%s°C bed=%s°C/%s°C",
suffix,
data["curr_nozzle_temp"], data.get("target_nozzle_temp", 0),
data.get("curr_hotbed_temp", "?"), data.get("target_hotbed_temp", 0))
else:
log.info("RX %-25s state=%-12s data=%s",
suffix, state, json.dumps(payload.get("data"), ensure_ascii=False))
# Resolve by report topic suffix (e.g. "info/report")
if suffix in self._pending_report:
entry = self._pending_report[suffix]
entry["result"] = payload
entry["event"].set()
# Resolve by msgid (for generic response ACK)
msgid = payload.get("msgid")
if msgid and msgid in self._pending_msgid:
entry = self._pending_msgid[msgid]
entry["result"] = payload
entry["event"].set()
# User callbacks by topic suffix (last two path components)
if suffix in self.callbacks:
try:
self.callbacks[suffix](payload)
except Exception as e:
log.error("callback error for %s: %s", suffix, e)
# Generic wildcard callback
if "*" in self.callbacks:
try:
self.callbacks["*"](topic, payload)
except Exception as e:
log.error("wildcard callback error: %s", e)
# -- Publish + request/response ------------------------------------------
def publish(self, msg_type: str, action: str, data=None, timeout: float = 5.0) -> dict | None:
msgid = str(uuid.uuid4())
payload = json.dumps({
"type": msg_type,
"action": action,
"msgid": msgid,
"timestamp": int(time.time() * 1000),
"data": data,
}, separators=(",", ":"))
# Wait by msgid only — avoids collisions when multiple threads
# call publish() for the same msg_type concurrently.
# Also register by report topic as fallback for responses without msgid.
report_key = f"{msg_type}/report"
event = threading.Event()
entry = {"event": event, "result": None}
self._pending_msgid[msgid] = entry
# Only register report-key waiter if nobody else is waiting on it
report_registered = False
if report_key not in self._pending_report:
self._pending_report[report_key] = entry
report_registered = True
topic = self._pub_topic(msg_type)
log.info("TX %-25s action=%-12s data=%s",
f"{msg_type}/request", action,
json.dumps(data, ensure_ascii=False) if data else "null")
try:
with self._lock:
self._sock.sendall(_build_publish(topic, payload))
except Exception as e:
log.error("send error: %s, reconnecting…", e)
self._pending_msgid.pop(msgid, None)
if report_registered:
self._pending_report.pop(report_key, None)
if not self._reconnect():
return None
# retry once after reconnect
try:
with self._lock:
self._sock.sendall(_build_publish(topic, payload))
self._pending_msgid[msgid] = entry
if report_registered:
self._pending_report[report_key] = entry
except Exception:
return None
if timeout <= 0:
self._pending_msgid.pop(msgid, None)
if report_registered:
self._pending_report.pop(report_key, None)
return None
received = event.wait(timeout)
self._pending_msgid.pop(msgid, None)
if report_registered:
self._pending_report.pop(report_key, None)
if not received:
return None
return entry["result"]
def publish_web(self, msg_type: str, action: str, data=None) -> None:
"""Fire-and-forget publish on the web/printer topic (used for runtime updates during print)."""
msgid = str(uuid.uuid4())
payload = json.dumps({
"type": msg_type,
"action": action,
"msgid": msgid,
"timestamp": int(time.time() * 1000),
"data": data,
}, separators=(",", ":"))
topic = self._web_topic(msg_type)
log.info("TX(web) %-23s action=%-12s data=%s",
f"{msg_type}/request", action,
json.dumps(data, ensure_ascii=False) if data else "null")
try:
with self._lock:
self._sock.sendall(_build_publish(topic, payload))
except Exception as e:
log.error("web send error: %s", e)
# -- High-level commands -------------------------------------------------
def query_info(self) -> dict | None:
return self.publish("info", "query")
def query_status(self) -> dict | None:
return self.publish("status", "query")
def query_multicolor_box(self) -> dict | None:
return self.publish("multiColorBox", "getInfo")
def set_temperature(self, nozzle: int, bed: int) -> dict | None:
return self.publish("tempature", "set",
{"target_nozzle_temp": nozzle, "target_hotbed_temp": bed})
def set_fan(self, pct: int) -> dict | None:
return self.publish("fan", "set", {"fan_speed_pct": pct})
def set_light(self, on: bool, brightness: int = 80) -> dict | None:
return self.publish("light", "control",
{"type": 2, "status": 1 if on else 0, "brightness": brightness})
def start_camera(self) -> dict | None:
return self.publish("video", "startCapture")
def stop_camera(self) -> dict | None:
return self.publish("video", "stopCapture")
def pause_print(self, taskid: str = "-1") -> dict | None:
return self.publish("print", "pause", {"taskid": taskid})
def resume_print(self, taskid: str = "-1") -> dict | None:
return self.publish("print", "resume", {"taskid": taskid})
def stop_print(self, taskid: str = "-1") -> dict | None:
return self.publish("print", "stop", {"taskid": taskid})
# -- G-Code Upload -------------------------------------------------------
def upload_gcode(self, filepath: str, remote_filename: str | None = None,
upload_url: str | None = None) -> dict:
"""Upload a G-Code or .3mf file via HTTP POST to port 18910.
Returns the parsed JSON response from the printer.
Raises RuntimeError on HTTP or connection errors.
Protocol captured via Wireshark 2026-04-18:
POST /gcode_upload?s={session_token}
Multipart fields: 'filename' (text) + 'gcode' (file bytes)
Required headers: X-File-Length, X-BBL-* (BambuLab heritage)
"""
if not upload_url:
info = self.query_info()
if not info:
raise RuntimeError("Could not get info/report for upload URL")
upload_url = info["data"]["urls"]["fileUploadurl"]
# parse token from URL query string
token = upload_url.split("?s=")[1] if "?s=" in upload_url else ""
with open(filepath, "rb") as f:
file_data = f.read()
if remote_filename is None:
remote_filename = os.path.basename(filepath)
boundary = "------------------------a3a050b927d92a4c"
sep = f"--{boundary}\r\n".encode()
end = f"--{boundary}--\r\n".encode()
part_filename = (
sep +
f'Content-Disposition: form-data; name="filename"\r\n\r\n'.encode() +
remote_filename.encode() + b"\r\n"
)
part_gcode = (
sep +
f'Content-Disposition: form-data; name="gcode"; filename="{remote_filename}"\r\n'
f'Content-Type: application/octet-stream\r\n\r\n'.encode() +
file_data + b"\r\n"
)
body = part_filename + part_gcode + end
headers = (
f"POST /gcode_upload?s={token} HTTP/1.1\r\n"
f"Host: {self.host}:18910\r\n"
f"User-Agent: AnycubicSlicerNext/1.3.9.4\r\n"
f"Accept: */*\r\n"
f"X-BBL-Client-Name: AnycubicSlicerNext\r\n"
f"X-BBL-Client-Type: slicer\r\n"
f"X-BBL-Client-Version: 01.03.09.04\r\n"
f"X-BBL-Device-ID: {str(uuid.uuid4())}\r\n"
f"X-BBL-Language: de-DE\r\n"
f"X-BBL-OS-Type: windows\r\n"
f"X-BBL-OS-Version: 10.0.26200\r\n"
f"X-File-Length: {len(file_data)}\r\n"
f"Content-Type: multipart/form-data; boundary={boundary}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n\r\n"
).encode()
sock = socket.create_connection((self.host, 18910), timeout=30)
sock.sendall(headers + body)
sock.settimeout(10)
response = b""
try:
while True:
chunk = sock.recv(65536)
if not chunk:
break
response += chunk
except socket.timeout:
pass
sock.close()
# parse HTTP response body
if b"\r\n\r\n" in response:
body_start = response.index(b"\r\n\r\n") + 4
resp_body = response[body_start:]
else:
resp_body = response
try:
return json.loads(resp_body)
except Exception:
raise RuntimeError(f"Upload: unerwartete Antwort: {resp_body[:200]}")
def move_axis(self, axis: int, move_type: int = 2, distance: int = 0) -> dict | None:
return self.publish("axis", "move",
{"axis": axis, "move_type": move_type, "distance": distance})
def home_all(self) -> dict | None:
# axis=4 move_type=2 = Home all axes (~4-15s)
return self.publish("axis", "move", {"axis": 4, "move_type": 2, "distance": 0}, timeout=30.0)
def home_axis(self, axis: int) -> dict | None:
# axis: 1=Y, 2=X, 3=Z
return self.publish("axis", "move", {"axis": axis, "move_type": 2, "distance": 0}, timeout=30.0)
def jog(self, axis: int, direction: int, distance_mm: int = 1) -> dict | None:
# axis: 1=Y, 2=X, 3=Z direction: 0=neg, 1=pos
return self.move_axis(axis=axis, move_type=direction, distance=distance_mm)
# ---------------------------------------------------------------------------
# CLI Demo
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Anycubic Kobra X LAN-Client")
parser.add_argument("--ip", default=env_loader.PRINTER_IP)
parser.add_argument("--port", type=int, default=env_loader.MQTT_PORT)
parser.add_argument("--username", default=env_loader.USERNAME)
parser.add_argument("--password", default=env_loader.PASSWORD)
parser.add_argument("--mode-id", default=env_loader.MODE_ID)
parser.add_argument("--device-id", default=env_loader.DEVICE_ID)
parser.add_argument("--monitor", action="store_true",
help="Dauerhaft mithören und alle Reports ausgeben")
args = parser.parse_args()
client = KobraXClient(
host=args.ip, port=args.port,
username=args.username, password=args.password,
mode_id=args.mode_id, device_id=args.device_id,
)
if args.monitor:
def on_msg(topic, payload):
suffix = "/".join(topic.split("/")[-2:])
ts = datetime.now().strftime("%H:%M:%S")
state = payload.get("state", "")
data = payload.get("data") or {}
if "progress" in data:
print(f"[{ts}] {suffix:25} state={state:12} progress={data['progress']}% layer={data.get('curr_layer','?')}/{data.get('total_layers','?')}")
elif "curr_nozzle_temp" in data:
print(f"[{ts}] {suffix:25} nozzle={data['curr_nozzle_temp']}°C/{data.get('target_nozzle_temp',0)}°C bed={data['curr_hotbed_temp']}°C/{data.get('target_hotbed_temp',0)}°C")
else:
print(f"[{ts}] {suffix:25} state={state}")
client.callbacks["*"] = on_msg
client.connect()
print("[kobrax] Monitor-Modus aktiv (Ctrl-C zum Beenden)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
client.disconnect()
else:
client.connect()
print("\n--- query_info ---")
info = client.query_info()
if info:
d = info.get("data", {})
print(f" Drucker: {d.get('printerName')} FW {d.get('version')}")
print(f" Status: {d.get('state')}")
t = d.get("temp", {})
print(f" Nozzle: {t.get('curr_nozzle_temp')}°C → {t.get('target_nozzle_temp')}°C")
print(f" Bett: {t.get('curr_hotbed_temp')}°C → {t.get('target_hotbed_temp')}°C")
urls = d.get("urls", {})
print(f" Upload: {urls.get('fileUploadurl')}")
print(f" Kamera: {urls.get('rtspUrl')}")
else:
print(" Keine Antwort")
client.disconnect()