"""
kobrax_moonraker_bridge.py – Moonraker-kompatibler HTTP/WebSocket-Bridge für Anycubic Kobra X
Emuliert die Moonraker/Klipper-API damit OrcaSlicer den Kobra X direkt ansteuern kann.
Verwendung:
python kobrax_moonraker_bridge.py --printer-ip 192.168.178.94
OrcaSlicer-Konfiguration:
Drucker-Typ: Klipper | Host: 127.0.0.1 | Port: 7125
"""
import argparse
import env_loader
import asyncio
import hashlib
import json
import logging
import os
import pathlib
import re
import sys
import tempfile
import time
import threading
# Bei PyInstaller-Binary liegt alles neben sys.executable, sonst neben __file__
_BASE = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, _BASE)
from kobrax_client import KobraXClient
try:
from aiohttp import web
import aiohttp
except ImportError:
print("Fehler: aiohttp nicht installiert. Bitte: pip install aiohttp")
sys.exit(1)
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)s %(message)s",
datefmt="%H:%M:%S")
log = logging.getLogger("bridge")
KOBRA_TO_KLIPPER_STATE = {
"free": "standby",
"busy": "printing",
"printing": "printing",
"preheating": "printing",
"auto_leveling": "printing",
"checking": "printing",
"updated": "printing",
"init": "printing",
"pausing": "paused",
"paused": "paused",
"resuming": "printing",
"resumed": "printing",
"stopping": "printing",
"stoped": "standby",
"finished": "complete",
"failed": "error",
"canceled": "standby",
}
MOONRAKER_VERSION = "v0.9.3-1"
KLIPPER_VERSION = "v0.12.0-1"
class KobraXBridge:
def __init__(self, client: KobraXClient, args=None):
self.client = client
self._args = args
self.ws_clients: set[web.WebSocketResponse] = set()
self._last_state: dict = {}
self._state = {
"nozzle_temp": 0.0,
"nozzle_target": 0.0,
"bed_temp": 0.0,
"bed_target": 0.0,
"print_state": "standby",
"kobra_state": "free",
"filename": "",
"progress": 0.0,
"print_duration": 0,
"remain_time": 0,
"curr_layer": 0,
"total_layers": 0,
"printer_name": "Anycubic Kobra X",
"firmware_version": "unknown",
"upload_url": "",
"camera_url": "",
"fan_speed": 0,
"light_on": False,
"light_brightness": 80,
"taskid": "-1",
"print_speed_mode": 2,
}
self._ams_slots: list[dict] = []
self._ams_loaded_slot: int = -1
self._last_uploaded_file: str = ""
self._serve_dir = tempfile.TemporaryDirectory(prefix="kobrax_serve_")
self._serve_dir_path: str = self._serve_dir.name
self._thumbnail_b64: str = "" # base64-PNG aus file/report
# Register MQTT push callbacks
client.callbacks["tempature/report"] = self._on_temp
client.callbacks["print/report"] = self._on_print
client.callbacks["info/report"] = self._on_info
client.callbacks["file/report"] = self._on_file
client.callbacks["multiColorBox/report"] = self._on_multicolor_box
# -------------------------------------------------------------------------
# MQTT callbacks (called from reader thread)
# -------------------------------------------------------------------------
def _on_temp(self, payload: dict):
d = payload.get("data") or {}
self._state["nozzle_temp"] = float(d.get("curr_nozzle_temp", 0))
self._state["nozzle_target"] = float(d.get("target_nozzle_temp", 0))
self._state["bed_temp"] = float(d.get("curr_hotbed_temp", 0))
self._state["bed_target"] = float(d.get("target_hotbed_temp", 0))
self._push_status_update()
def _on_print(self, payload: dict):
d = payload.get("data") or {}
kobra_state = payload.get("state", "")
self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "printing")
if kobra_state:
self._state["kobra_state"] = kobra_state
if kobra_state in ("stoped", "canceled"):
self._state["progress"] = 0.0
self._state["filename"] = ""
self._state["filename"] = d.get("filename", self._state["filename"])
if "progress" in d:
self._state["progress"] = float(d["progress"]) / 100.0
if "print_time" in d:
self._state["print_duration"] = int(d["print_time"]) * 60
if "remain_time" in d:
self._state["remain_time"] = int(d["remain_time"]) * 60
if "curr_layer" in d:
self._state["curr_layer"] = d["curr_layer"]
if "total_layers" in d:
self._state["total_layers"] = d["total_layers"]
if "taskid" in d:
self._state["taskid"] = str(d["taskid"])
settings = d.get("settings") or {}
if "print_speed_mode" in settings:
self._state["print_speed_mode"] = int(settings["print_speed_mode"])
self._push_status_update()
def _on_info(self, payload: dict):
d = payload.get("data") or {}
self._state["printer_name"] = d.get("printerName", self._state["printer_name"])
self._state["firmware_version"] = d.get("version", self._state["firmware_version"])
kobra_state = d.get("state", "")
if kobra_state:
self._state["print_state"] = KOBRA_TO_KLIPPER_STATE.get(kobra_state, "standby")
self._state["kobra_state"] = kobra_state
t = d.get("temp") or {}
if t:
self._state["nozzle_temp"] = float(t.get("curr_nozzle_temp", 0))
self._state["nozzle_target"] = float(t.get("target_nozzle_temp", 0))
self._state["bed_temp"] = float(t.get("curr_hotbed_temp", 0))
self._state["bed_target"] = float(t.get("target_hotbed_temp", 0))
urls = d.get("urls") or {}
if urls.get("fileUploadurl"):
self._state["upload_url"] = urls["fileUploadurl"]
if urls.get("rtspUrl"):
self._state["camera_url"] = urls["rtspUrl"]
fan = d.get("fan_speed_pct")
if fan is not None:
self._state["fan_speed"] = int(fan)
speed_mode = d.get("print_speed_mode")
if speed_mode is not None:
self._state["print_speed_mode"] = int(speed_mode)
self._push_status_update()
def _on_file(self, payload: dict):
d = payload.get("data") or {}
details = d.get("file_details") or {}
thumb = details.get("thumbnail") or details.get("png_image") or ""
if thumb:
self._thumbnail_b64 = thumb
log.info(f"Vorschaubild empfangen: {len(thumb)} Zeichen base64")
self._push_status_update()
def _on_multicolor_box(self, payload: dict):
boxes = (payload.get("data") or {}).get("multi_color_box") or []
if not boxes:
return
box = boxes[0]
slots = box.get("slots") or []
loaded = box.get("loaded_slot", -1)
if loaded != -1:
self._ams_loaded_slot = loaded
# Tip-Forming: nach Einziehen (status=10) oder Ausziehen (status=11)
# schickt der originale Slicer automatisch type=3 (Extruder-Rückzug)
fs = box.get("feed_status") or {}
current_status = fs.get("current_status")
slot_index = fs.get("slot_index", 0)
if current_status in (10, 11):
import threading
def _tip_form():
import time; time.sleep(2)
self.client.publish(
"multiColorBox", "feedFilament",
{"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": 3}}]},
timeout=0
)
log.info(f"Tip-Forming (type=3) nach status={current_status} slot={slot_index}")
threading.Thread(target=_tip_form, daemon=True).start()
if slots:
self._ams_slots = slots
log.info(f"AMS-Slots empfangen: {len(slots)}, loaded_slot={self._ams_loaded_slot}")
self._push_status_update()
# -------------------------------------------------------------------------
# WebSocket push
# -------------------------------------------------------------------------
def _push_status_update(self):
if not self.ws_clients:
return
msg = {
"jsonrpc": "2.0",
"method": "notify_status_update",
"params": [self._build_printer_objects(), time.time()],
}
text = json.dumps(msg)
dead = set()
for ws in self.ws_clients:
try:
asyncio.run_coroutine_threadsafe(ws.send_str(text), ws._loop)
except Exception:
dead.add(ws)
self.ws_clients -= dead
def _build_printer_objects(self) -> dict:
s = self._state
return {
"extruder": {
"temperature": s["nozzle_temp"],
"target": s["nozzle_target"],
"power": 0.0,
},
"heater_bed": {
"temperature": s["bed_temp"],
"target": s["bed_target"],
"power": 0.0,
},
"print_stats": {
"state": s["print_state"],
"filename": s["filename"],
"print_duration": s["print_duration"],
"total_duration": s["print_duration"],
"remain_time": s["remain_time"],
"info": {
"current_layer": s["curr_layer"],
"total_layer": s["total_layers"],
},
},
"display_status": {
"progress": s["progress"],
"message": "",
},
"virtual_sdcard": {
"progress": s["progress"],
"is_active": s["print_state"] == "printing",
"file_path": s["filename"],
},
"toolhead": {
"position": [0, 0, 0, 0],
"homed_axes": "xyz",
"print_time": s["print_duration"],
"estimated_print_time": s["print_duration"],
},
}
# -------------------------------------------------------------------------
# HTTP handlers
# -------------------------------------------------------------------------
async def handle_server_info(self, request):
return web.json_response({
"result": {
"klippy_connected": True,
"klippy_state": "ready",
"components": ["file_manager", "job_state", "virtual_sdcard"],
"failed_components":[],
"registered_directories": ["gcodes"],
"warnings": [],
"websocket_count": len(self.ws_clients),
"moonraker_version": MOONRAKER_VERSION,
"api_version": [1, 3, 0],
"api_version_string": "1.3.0",
}
})
async def handle_printer_info(self, request):
s = self._state
return web.json_response({
"result": {
"state": "ready",
"state_message": "Printer is ready",
"hostname": "kobrax-bridge",
"klipper_path": "/home/pi/klipper",
"python_path": "/home/pi/klippy-env/bin/python",
"log_file": "/tmp/klippy.log",
"config_file": "/home/pi/printer.cfg",
"software_version": KLIPPER_VERSION,
"cpu_info": s["printer_name"],
}
})
async def handle_machine_system_info(self, request):
return web.json_response({
"result": {
"system_info": {
"cpu_info": {"cpu_count": 4, "bits": "64bit", "processor": "armv7l",
"cpu_desc": "Anycubic Kobra X Bridge", "serial_number": "",
"hardware_desc": "", "model": "Kobra X Bridge",
"total_memory": 524288, "memory_units": "kB"},
"sd_info": {},
"distribution": {"name": "Linux", "id": "linux", "version": "1.0",
"version_parts": {}, "like": "", "codename": ""},
"available_services": [],
"service_state": {},
"python": {"version": list(sys.version_info[:3]), "version_string": sys.version},
"network": {},
"canbus": {},
}
}
})
async def handle_objects_query(self, request):
objects = self._build_printer_objects()
# filter by requested objects if specified
requested = dict(request.rel_url.query)
if requested:
filtered = {k: objects[k] for k in requested if k in objects}
else:
filtered = objects
return web.json_response({"result": {"status": filtered, "eventtime": time.time()}})
async def handle_objects_list(self, request):
return web.json_response({
"result": {
"objects": list(self._build_printer_objects().keys())
}
})
async def handle_objects_subscribe(self, request):
return web.json_response({
"result": {
"status": self._build_printer_objects(),
"eventtime": time.time(),
}
})
async def handle_files_list(self, request):
filename = self._state.get("filename", "")
files = []
if filename:
files.append({
"path": filename,
"modified": time.time(),
"size": 0,
"permissions": "rw",
})
return web.json_response({"result": files})
async def handle_file_upload(self, request):
log.info(f"Upload-Request: {request.method} {request.path_qs} CT={request.headers.get('Content-Type','')[:60]}")
ct = request.headers.get("Content-Type", "")
if "multipart" not in ct:
return web.json_response({"error": "expected multipart"}, status=400)
reader = await request.multipart()
file_data = None
remote_filename = self._last_uploaded_file or "upload.gcode"
async for part in reader:
if part.name in ("file", "gcode", "upload_file"):
remote_filename = part.filename or remote_filename
file_data = await part.read()
log.info(f"Multipart-Feld '{part.name}': {remote_filename} ({len(file_data)} bytes)")
elif part.name == "path":
val = (await part.read()).decode("utf-8", errors="replace").strip()
if val:
remote_filename = val
else:
log.debug(f"Unbekanntes Multipart-Feld: {part.name}")
if not file_data:
return web.json_response({"error": "no file received"}, status=400)
file_md5 = hashlib.md5(file_data).hexdigest()
file_size = len(file_data)
# Datei auf Disk ablegen (temp-Verzeichnis) damit Drucker sie per HTTP abrufen kann
safe_name = os.path.basename(remote_filename) # keine Pfad-Traversal
serve_path = os.path.join(self._serve_dir_path, safe_name)
with open(serve_path, "wb") as f:
f.write(file_data)
del file_data # RAM freigeben
self._last_uploaded_file = remote_filename
log.info(f"Upload: {remote_filename} ({file_size} bytes) md5={file_md5} → Drucker")
# Datei per HTTP auf den Drucker hochladen (serve_path liegt bereits auf Disk)
upload_url = self._state.get("upload_url") or None
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(
None, self.client.upload_gcode, serve_path, remote_filename, upload_url
)
except Exception as e:
log.error(f"Upload fehlgeschlagen: {e}")
return web.json_response({"error": str(e)}, status=500)
log.info(f"Upload erfolgreich: {result}")
# Druck starten mit vollständigem Payload (inkl. serve-URL + md5 + size)
serve_url = f"http://{request.host}/serve/{remote_filename}"
log.info(f"Starte Druck automatisch: {remote_filename}")
loop = asyncio.get_event_loop()
loop.run_in_executor(None, lambda: self._start_print(remote_filename, serve_url, file_md5, file_size))
# OctoPrint-kompatibler Response (OrcaSlicer wertet refs aus)
return web.json_response({
"done": True,
"files": {
"local": {
"name": remote_filename,
"origin": "local",
"path": remote_filename,
"refs": {
"download": f"http://{request.host}/api/files/local/{remote_filename}",
"resource": f"http://{request.host}/api/files/local/{remote_filename}",
}
}
},
"result": {
"item": {"path": remote_filename, "root": "gcodes"},
"action": "create_file",
}
}, status=201)
def _start_print(self, filename: str, url: str = "", md5: str = "", filesize: int = 0):
use_ams = len(self._ams_slots) > 0
ams_box_mapping = [
{
"paint_index": i,
"ams_index": i,
"paint_color": [255, 255, 255, 255],
"ams_color": [255, 255, 255, 255],
"material_type": s.get("material_type", "PLA"),
}
for i, s in enumerate(self._ams_slots)
]
payload = {
"taskid": "-1",
"url": url,
"filename": filename,
"md5": md5,
"filepath": None,
"filetype": 1,
"project_type": 1,
"filesize": filesize,
"ams_settings": {
"use_ams": use_ams,
"ams_box_mapping": ams_box_mapping,
},
"task_settings": {
"auto_leveling": 1,
"vibration_compensation": 0,
"flow_calibration": 0,
"dry_mode": 0,
"ai_settings": {"status": 0, "count": 0, "type": 1},
"timelapse": {"status": 0, "count": 0, "type": 64},
"drying_settings": {"status": 0, "target_temp": 0, "duration": 0, "remain_time": 0},
"model_objects_skip_parts": [],
},
}
# Thumbnail vorab anfordern (Drucker antwortet async auf file/report)
self._thumbnail_b64 = ""
self.client.publish("file", "fileDetails",
{"root": "local", "filename": filename}, timeout=0)
log.info(f"print/start → {filename} url={url} ams={len(self._ams_slots)} slots")
result = self.client.publish("print", "start", payload, timeout=15.0)
if result:
log.info(f"Druckstart bestätigt: state={result.get('state')}")
else:
log.warning("Druckstart: keine Antwort vom Drucker")
async def handle_print_start(self, request):
try:
body = await request.json()
except Exception:
body = {}
filename = body.get("filename") or self._last_uploaded_file
if not filename:
return web.json_response({"error": "no filename"}, status=400)
log.info(f"Druck starten: {filename}")
# AMS-Mapping aus gecachtem State
ams_box_mapping = []
for i, slot in enumerate(self._ams_slots):
ams_box_mapping.append({
"slot_index": i,
"material_type": slot.get("material_type", "PLA"),
"color": slot.get("color", "FFFFFF"),
})
use_ams = len(self._ams_slots) > 0
payload = {
"filename": filename,
"taskid": str(int(time.time())),
"use_ams": use_ams,
}
if ams_box_mapping:
payload["ams_box_mapping"] = ams_box_mapping
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: self.client.publish("print", "start", payload, timeout=15.0)
)
if result is None:
return web.json_response({"error": "Keine Antwort vom Drucker"}, status=504)
return web.json_response({"result": "ok"})
async def handle_print_pause(self, request):
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.pause_print(taskid))
return web.json_response({"result": "ok"})
async def handle_print_resume(self, request):
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.resume_print(taskid))
return web.json_response({"result": "ok"})
async def handle_print_cancel(self, request):
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.stop_print(taskid))
return web.json_response({"result": "ok"})
async def handle_octoprint_version(self, request):
return web.json_response({
"api": "0.1",
"server": "1.9.0",
"text": "OctoPrint (Kobra X Bridge)",
})
async def handle_index(self, request):
html = r"""
KX-Bridge
⬡ KX-Bridge
Anycubic Kobra X
Standby
Version
–
📷 Kamera nicht gestartet
◉ Fortschritt
0%
–
–
–
–
⊙ Temperaturen
Verlauf (letzte 60 Messungen)
✛ XY-Achsen
↕ Z-Achse
Schrittweite: 1 mm
🏎 Druckgeschwindigkeit
🌀 Lüfter
0
◫ AMS / Filamentbox
Keine AMS-Daten empfangen
Slot auswählen
Slot 1
"""
version = self._read_version()
html = html.replace("'__VERSION__'", f"'{version}'")
return web.Response(text=html, content_type="text/html",
headers={"Cache-Control": "no-store, no-cache, must-revalidate"})
async def handle_api_light(self, request):
try:
body = await request.json()
except Exception:
body = {}
on = bool(body.get("on", True))
brightness = int(body.get("brightness", self._state["light_brightness"]))
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.publish(
"light", "control",
{"type": 3, "status": 1 if on else 0, "brightness": brightness},
timeout=0
))
self._state["light_on"] = on
self._state["light_brightness"] = brightness
return web.json_response({"result": "ok"})
async def handle_api_fan(self, request):
try:
body = await request.json()
except Exception:
body = {}
speed = int(body.get("speed", 0))
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.publish(
"fan", "setSpeed", {"fan_speed_pct": speed}, timeout=0
))
self._state["fan_speed"] = speed
return web.json_response({"result": "ok"})
async def handle_api_connect(self, request):
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, self.client.connect)
self._state["print_state"] = "standby"
self._state["kobra_state"] = "free"
log.info("Manuell verbunden")
return web.json_response({"result": "connected"})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def handle_api_disconnect(self, request):
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, self.client.disconnect)
except Exception:
pass
self._state["print_state"] = "error"
self._state["kobra_state"] = "offline"
log.info("Manuell getrennt")
return web.json_response({"result": "disconnected"})
async def handle_api_speed(self, request):
try:
body = await request.json()
except Exception:
body = {}
mode = int(body.get("mode", 2))
loop = asyncio.get_event_loop()
taskid = self._state.get("taskid", "-1")
await loop.run_in_executor(None, lambda: self.client.publish_web(
"print", "update",
{"taskid": taskid, "settings": {"print_speed_mode": mode}},
))
self._state["print_speed_mode"] = mode
return web.json_response({"result": "ok"})
async def handle_api_ams_feed(self, request):
try:
body = await request.json()
except Exception:
body = {}
slot_index = int(body.get("slot_index", 0))
feed_type = int(body.get("type", 1))
# Ausziehen (type=2): wenn kein Slot explizit gewählt, den zuletzt geladenen nehmen
if feed_type == 2 and self._ams_loaded_slot >= 0:
slot_index = self._ams_loaded_slot
loop = asyncio.get_event_loop()
def _send():
resp = self.client.publish(
"multiColorBox", "feedFilament",
{"multi_color_box": [{"id": -1, "feed_status": {"slot_index": slot_index, "type": feed_type}}]},
timeout=5
)
log.info(f"feedFilament type={feed_type} slot={slot_index} loaded_slot={self._ams_loaded_slot} → {resp}")
await loop.run_in_executor(None, _send)
return web.json_response({"result": "ok"})
async def handle_api_axis(self, request):
try:
body = await request.json()
except Exception:
body = {}
axis = int(body.get("axis", 4))
move_type = int(body.get("move_type", 2))
distance = float(body.get("distance", 0))
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.publish(
"axis", "move",
{"axis": axis, "move_type": move_type, "distance": distance},
timeout=0
))
return web.json_response({"result": "ok"})
async def handle_api_temperature(self, request):
try:
body = await request.json()
except Exception:
body = {}
nozzle = body.get("nozzle")
bed = body.get("bed")
loop = asyncio.get_event_loop()
printing = self._state.get("print_state") == "printing"
if printing:
# During print: runtime update via web/printer topic, one setting at a time
taskid = self._state.get("taskid", "-1")
if nozzle is not None:
n = int(float(nozzle))
await loop.run_in_executor(None, lambda: self.client.publish_web(
"print", "update",
{"taskid": taskid, "settings": {"target_nozzle_temp": n}},
))
if bed is not None:
b = int(float(bed))
await loop.run_in_executor(None, lambda: self.client.publish_web(
"print", "update",
{"taskid": taskid, "settings": {"target_hotbed_temp": b}},
))
else:
# Idle: standard tempature/set with both values
n = int(float(nozzle)) if nozzle is not None else int(self._state["nozzle_target"])
b = int(float(bed)) if bed is not None else int(self._state["bed_target"])
await loop.run_in_executor(None, lambda: self.client.publish(
"tempature", "set",
{"target_nozzle_temp": n, "target_hotbed_temp": b},
timeout=0
))
return web.json_response({"result": "ok"})
async def handle_api_camera(self, request):
return web.json_response({"url": self._state["camera_url"]})
async def handle_api_camera_start(self, request):
loop = asyncio.get_event_loop()
# Wait for pushStarted confirmation before returning
result = await loop.run_in_executor(None, lambda: self.client.publish(
"video", "startCapture", None, timeout=8.0
))
state = (result or {}).get("state", "")
log.info(f"Kamera startCapture: state={state}")
return web.json_response({"result": "ok", "state": state})
async def handle_api_camera_stop(self, request):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: self.client.publish(
"video", "stopCapture", None, timeout=0
))
return web.json_response({"result": "ok"})
async def handle_camera_stream(self, request):
"""MJPEG proxy: FLV → MJPEG via ffmpeg, served as multipart/x-mixed-replace."""
url = self._state.get("camera_url", "")
if not url:
return web.Response(status=503, text="Keine Kamera-URL bekannt")
boundary = "kobraxframe"
resp = web.StreamResponse(headers={
"Content-Type": f"multipart/x-mixed-replace;boundary={boundary}",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
})
await resp.prepare(request)
is_rtsp = url.lower().startswith("rtsp://")
ffmpeg_input_args = [
"-fflags", "nobuffer",
"-flags", "low_delay",
]
if is_rtsp:
ffmpeg_input_args += ["-probesize", "32", "-analyzeduration", "0", "-rtsp_transport", "tcp"]
else:
# HTTP-FLV/HLS: braucht mehr Probe-Puffer für Container-Erkennung
ffmpeg_input_args += ["-probesize", "1000000", "-analyzeduration", "1000000"]
proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-loglevel", "quiet",
*ffmpeg_input_args,
"-i", url,
"-vf", "fps=15,scale=640:-1",
"-f", "image2pipe",
"-vcodec", "mjpeg",
"-q:v", "3",
"-flush_packets", "1",
"pipe:1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
buf = b""
try:
while True:
chunk = await proc.stdout.read(65536)
if not chunk:
break
buf += chunk
# Extract complete JPEG frames (SOI=FFD8, EOI=FFD9)
while True:
start = buf.find(b"\xff\xd8")
if start == -1:
buf = b""
break
end = buf.find(b"\xff\xd9", start + 2)
if end == -1:
buf = buf[start:]
break
frame = buf[start:end + 2]
buf = buf[end + 2:]
header = (
f"--{boundary}\r\n"
f"Content-Type: image/jpeg\r\n"
f"Content-Length: {len(frame)}\r\n\r\n"
).encode()
try:
await resp.write(header + frame + b"\r\n")
except Exception:
return resp
except Exception as e:
log.warning(f"Kamera-Stream unterbrochen: {e}")
finally:
try:
proc.kill()
except Exception:
pass
return resp
async def handle_serve_file(self, request):
"""Liefert hochgeladene G-Code-Dateien vom Temp-Verzeichnis (für Drucker-Download)."""
filename = os.path.basename(request.match_info.get("filename", ""))
serve_path = os.path.join(self._serve_dir_path, filename)
if not os.path.isfile(serve_path):
return web.Response(status=404, text="not found")
size = os.path.getsize(serve_path)
log.info(f"Drucker lädt Datei ab: {filename} ({size} bytes)")
return web.FileResponse(serve_path, headers={
"Content-Disposition": f'attachment; filename="{filename}"'
})
async def handle_api_state(self, request):
s = self._state
return web.json_response({
"printer_name": s["printer_name"],
"firmware_version": s["firmware_version"],
"print_state": s["print_state"],
"kobra_state": s["kobra_state"],
"nozzle_temp": s["nozzle_temp"],
"nozzle_target": s["nozzle_target"],
"bed_temp": s["bed_temp"],
"bed_target": s["bed_target"],
"progress": s["progress"],
"print_duration": s["print_duration"],
"remain_time": s["remain_time"],
"curr_layer": s["curr_layer"],
"total_layers": s["total_layers"],
"filename": s["filename"],
"camera_url": s["camera_url"],
"fan_speed": s["fan_speed"],
"print_speed_mode": s["print_speed_mode"],
"light_on": s["light_on"],
"light_brightness": s["light_brightness"],
"ams_slots": self._ams_slots,
"ams_loaded_slot": self._ams_loaded_slot,
"thumbnail": self._thumbnail_b64,
})
async def handle_moonraker_database(self, request):
"""OrcaSlicer 'Synchronize filament list from AMS' liest /server/database/item?namespace=lane_data"""
namespace = request.rel_url.query.get("namespace", "")
if namespace != "lane_data":
return web.json_response({"result": {"namespace": namespace, "value": {}}})
loop = asyncio.get_event_loop()
slots = await loop.run_in_executor(None, lambda: self._get_ams_slots_fresh())
lane_data = {}
for i, slot in enumerate(slots):
rgb = slot.get("color", [128, 128, 128])
if isinstance(rgb, list) and len(rgb) >= 3:
alpha = rgb[3] if len(rgb) == 4 else 255
color_hex = f"{rgb[0]:02X}{rgb[1]:02X}{rgb[2]:02X}{alpha:02X}"
else:
color_hex = "808080FF"
material = slot.get("type", "")
default_temps = {
"PLA": {"nozzle": 220, "bed": 60},
"PETG": {"nozzle": 240, "bed": 70},
"ABS": {"nozzle": 250, "bed": 100},
"TPU": {"nozzle": 230, "bed": 40},
}
temps = default_temps.get(material.upper(), {"nozzle": 220, "bed": 60})
lane_data[f"lane{i}"] = {
"vendor_name": "Anycubic",
"name": material,
"color": color_hex,
"material": material,
"bed_temp": temps["bed"],
"nozzle_temp": temps["nozzle"],
"scan_time": None,
"td": None,
"lane": str(i),
"spool_id": None,
"filament_id": None,
}
log.info(f"AMS-Sync: {len(lane_data)} Slots an OrcaSlicer")
return web.json_response({"result": {"namespace": "lane_data", "value": lane_data}})
def _get_ams_slots_fresh(self):
"""Frische Slot-Daten per getInfo holen, Fallback auf gecachte."""
resp = self.client.publish("multiColorBox", "getInfo", None, timeout=5)
if resp and resp.get("data"):
boxes = resp["data"].get("multi_color_box") or []
if boxes:
slots = boxes[0].get("slots") or []
if slots:
self._ams_slots = slots
return self._ams_slots
# ─── Settings ────────────────────────────────────────────────────────────
def _find_env_path(self) -> pathlib.Path:
"""Gibt den Pfad zur .env-Datei zurück (neben Script oder im Parent)."""
script_dir = pathlib.Path(_BASE)
for base in (script_dir, script_dir.parent):
p = base / ".env"
if p.is_file():
return p
return script_dir.parent / ".env"
async def handle_api_settings_get(self, request):
return web.json_response({
"printer_ip": self._args.printer_ip,
"mqtt_port": self._args.mqtt_port,
"username": self._args.username,
"password": self._args.password,
"mode_id": self._args.mode_id,
"device_id": self._args.device_id,
})
async def handle_api_settings_post(self, request):
data = await request.json()
env_path = self._find_env_path()
# Bestehende .env einlesen um Kommentare/Extra-Keys zu erhalten
existing: "dict[str, str]" = {}
lines: "list[str]" = []
if env_path.is_file():
for line in env_path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#") and "=" in stripped:
k, _, v = stripped.partition("=")
existing[k.strip()] = v.strip()
lines.append(line)
# Werte aktualisieren
mapping = {
"PRINTER_IP": str(data.get("printer_ip", existing.get("PRINTER_IP", ""))),
"MQTT_PORT": str(data.get("mqtt_port", existing.get("MQTT_PORT", "9883"))),
"MQTT_USERNAME": str(data.get("username", existing.get("MQTT_USERNAME",""))),
"MQTT_PASSWORD": str(data.get("password", existing.get("MQTT_PASSWORD",""))),
"MODE_ID": str(data.get("mode_id", existing.get("MODE_ID", ""))),
"DEVICE_ID": str(data.get("device_id", existing.get("DEVICE_ID", ""))),
}
# Zeilen ersetzen oder neue Keys anhängen
written: "set[str]" = set()
new_lines: "list[str]" = []
for line in lines:
stripped = line.strip()
if stripped and not stripped.startswith("#") and "=" in stripped:
k = stripped.partition("=")[0].strip()
if k in mapping:
new_lines.append(f"{k}={mapping[k]}")
written.add(k)
continue
new_lines.append(line)
for k, v in mapping.items():
if k not in written:
new_lines.append(f"{k}={v}")
env_path.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
log.info(f"Settings gespeichert in {env_path}")
# Response senden, dann Neustart
response = web.json_response({"status": "restarting"})
asyncio.get_event_loop().call_later(0.3, self._restart_bridge)
return response
def _restart_bridge(self):
log.info("Bridge wird neu gestartet …")
os.execv(sys.executable, [sys.executable] + sys.argv)
# ─── Update ──────────────────────────────────────────────────────────────
GITEA_RELEASE_API = "https://gitea.it-drui.de/api/v1/repos/viewit/KX-Bridge-Release/releases?limit=1&pre-release=true"
GITEA_RAW_BASE = "https://gitea.it-drui.de/viewit/KX-Bridge-Release/raw/tag"
def _read_version(self) -> str:
for base in (pathlib.Path(_BASE), pathlib.Path(_BASE).parent):
p = base / "VERSION"
if p.is_file():
return p.read_text(encoding="utf-8").strip()
return "unknown"
def _write_version(self, version: str):
for base in (pathlib.Path(_BASE), pathlib.Path(_BASE).parent):
p = base / "VERSION"
if p.is_file():
p.write_text(version + "\n", encoding="utf-8")
return
(pathlib.Path(_BASE) / "VERSION").write_text(version + "\n", encoding="utf-8")
@staticmethod
def _parse_version(v: str) -> "tuple[int, ...]":
"""'v0.9.1-beta1' → (0, 9, 1) – nur numerische Teile vor dem ersten '-'"""
v = v.lstrip("v").split("-")[0]
parts = re.split(r"[.\s]+", v)
result = []
for p in parts:
try:
result.append(int(p))
except ValueError:
break
return tuple(result) or (0,)
async def handle_api_update_check(self, request):
current = self._read_version()
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.GITEA_RELEASE_API, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
return web.json_response({"error": f"Gitea HTTP {resp.status}"}, status=502)
releases = await resp.json(content_type=None)
if not releases:
return web.json_response({"error": "Keine Releases gefunden"}, status=404)
data = releases[0]
tag = data.get("tag_name", "")
latest = tag.lstrip("v")
update_available = self._parse_version(tag) > self._parse_version(current)
download_url = f"{self.GITEA_RAW_BASE}/{tag}/kobrax_moonraker_bridge.py"
return web.json_response({
"current": current,
"latest": latest,
"update_available": update_available,
"tag": tag,
"download_url": download_url,
})
except Exception as e:
return web.json_response({"error": str(e)}, status=502)
async def handle_api_update_apply(self, request):
data = await request.json()
download_url = data.get("download_url", "")
new_tag = data.get("tag", "")
if not download_url:
return web.json_response({"error": "download_url fehlt"}, status=400)
script_path = pathlib.Path(sys.executable if getattr(sys, "frozen", False) else __file__).resolve()
try:
async with aiohttp.ClientSession() as session:
async with session.get(download_url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return web.json_response({"error": f"Download HTTP {resp.status}"}, status=502)
content = await resp.read()
# Atomisch ersetzen
tmp = script_path.with_suffix(".py.new")
tmp.write_bytes(content)
os.replace(tmp, script_path)
if new_tag:
self._write_version(new_tag.lstrip("v"))
log.info(f"Update auf {new_tag} installiert, starte neu …")
except Exception as e:
return web.json_response({"error": str(e)}, status=502)
response = web.json_response({"status": "updating"})
asyncio.get_event_loop().call_later(0.3, self._restart_bridge)
return response
async def handle_catchall(self, request):
body = await request.read()
log.warning(f"UNBEKANNT {request.method} {request.path_qs} body={body[:200]}")
return web.json_response({"result": {}}, status=200)
async def handle_favicon(self, request):
# Minimales 1x1 ICO damit der Browser nicht 404 loggt
ico = bytes([
0,0,1,0,1,0,1,1,0,0,1,0,24,0,40,0,0,0,22,0,0,0,40,0,0,0,
1,0,0,0,2,0,0,0,1,0,24,0,0,0,0,0,4,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,255,102,0,0,0,0,0,0
])
return web.Response(body=ico, content_type="image/x-icon")
# -------------------------------------------------------------------------
# WebSocket handler
# -------------------------------------------------------------------------
async def handle_websocket(self, request):
ws = web.WebSocketResponse(heartbeat=30)
await ws.prepare(request)
ws._loop = asyncio.get_event_loop()
self.ws_clients.add(ws)
log.info(f"WS client verbunden ({len(self.ws_clients)} gesamt)")
# Send klippy_ready notification
await ws.send_str(json.dumps({
"jsonrpc": "2.0",
"method": "notify_klippy_ready",
"params": [],
}))
# Send initial status
await ws.send_str(json.dumps({
"jsonrpc": "2.0",
"method": "notify_status_update",
"params": [self._build_printer_objects(), time.time()],
}))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_ws_rpc(ws, msg.data)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE):
break
self.ws_clients.discard(ws)
log.info(f"WS client getrennt ({len(self.ws_clients)} verbleibend)")
return ws
async def _handle_ws_rpc(self, ws: web.WebSocketResponse, raw: str):
try:
req = json.loads(raw)
except Exception:
return
rpc_id = req.get("id")
method = req.get("method", "")
log.info(f"WS RPC: {method} params={str(req.get('params',''))[:120]}")
params = req.get("params") or {}
if isinstance(params, list):
params = params[0] if params else {}
result = None
error = None
try:
if method in ("printer.info", "printer_info"):
result = {
"state": "ready",
"state_message": "Printer is ready",
"hostname": "kobrax-bridge",
"software_version": KLIPPER_VERSION,
"cpu_info": self._state["printer_name"],
"klipper_path": "/home/pi/klipper",
"python_path": "/home/pi/klippy-env/bin/python",
}
elif method in ("server.info", "server_info"):
result = {
"klippy_connected": True,
"klippy_state": "ready",
"moonraker_version": MOONRAKER_VERSION,
"components": [],
"failed_components": [],
"registered_directories": ["gcodes"],
"warnings": [],
}
elif method in ("printer.objects.list",):
result = {"objects": list(self._build_printer_objects().keys())}
elif method in ("printer.objects.query", "printer.objects.get"):
objects = params.get("objects", {})
all_objs = self._build_printer_objects()
if objects:
filtered = {k: all_objs.get(k, {}) for k in objects}
else:
filtered = all_objs
result = {"status": filtered, "eventtime": time.time()}
elif method == "printer.objects.subscribe":
objects = params.get("objects", {})
all_objs = self._build_printer_objects()
if objects:
filtered = {k: all_objs.get(k, {}) for k in objects}
else:
filtered = all_objs
result = {"status": filtered, "eventtime": time.time()}
elif method == "printer.print.start":
filename = params.get("filename", self._last_uploaded_file)
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(
None, lambda: self.client.publish("print", "start",
{"filename": filename, "use_ams": False}, timeout=15.0)
)
result = "ok" if resp else "timeout"
elif method == "printer.print.pause":
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.pause_print)
result = "ok"
elif method == "printer.print.resume":
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.resume_print)
result = "ok"
elif method == "printer.print.cancel":
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.stop_print)
result = "ok"
elif method == "machine.system_info":
result = {"system_info": {"cpu_info": {"cpu_desc": "Kobra X Bridge"}}}
elif method == "server.files.list":
result = []
else:
log.debug(f"Unbekannte RPC-Methode: {method}")
result = {}
except Exception as e:
log.error(f"RPC-Fehler für {method}: {e}")
error = {"code": -32603, "message": str(e)}
if rpc_id is not None:
response = {"jsonrpc": "2.0", "id": rpc_id}
if error:
response["error"] = error
else:
response["result"] = result
await ws.send_str(json.dumps(response))
# -------------------------------------------------------------------------
# Poll loop (sync, runs in executor)
# -------------------------------------------------------------------------
def _printer_reachable(self) -> bool:
"""TCP-Probe auf den MQTT-Port – kein ICMP nötig, kein root erforderlich."""
import socket as _socket
try:
with _socket.create_connection(
(self._args.printer_ip, self._args.mqtt_port), timeout=2.0
):
return True
except OSError:
return False
def _poll_loop(self, stop_event: threading.Event):
_offline = self._state["kobra_state"] == "offline"
_probe_interval = 10.0 # Sekunden zwischen TCP-Probes im Offline-Modus
while not stop_event.is_set():
# ── Offline-Modus: warten bis Drucker wieder erreichbar ──────────
if _offline:
if self._printer_reachable():
log.info("Drucker erreichbar – stelle MQTT-Verbindung her …")
try:
self.client.connect()
_offline = False
self._state["print_state"] = "standby"
self._state["kobra_state"] = "free"
log.info("MQTT-Verbindung wiederhergestellt")
except Exception as e:
log.warning(f"Verbindungsaufbau fehlgeschlagen: {e}")
stop_event.wait(_probe_interval)
continue
else:
stop_event.wait(_probe_interval)
continue
# ── Online-Modus: normaler Poll ──────────────────────────────────
try:
info = self.client.query_info()
if info:
self._on_info(info)
# Während Druck: print/report direkt abfragen
if self._state["print_state"] in ("printing", "preheating",
"auto_leveling", "checking", "init"):
print_r = self.client.publish("print", "query", timeout=3.0)
if print_r:
self._on_print(print_r)
box = self.client.query_multicolor_box()
if box:
boxes = (box.get("data") or {}).get("multi_color_box") or []
slots = boxes[0].get("slots") or [] if boxes else []
if slots:
self._ams_slots = slots
except Exception as e:
log.warning(f"Poll-Fehler: {e}")
# Prüfen ob Drucker wirklich weg ist
if not self._printer_reachable():
log.info("Drucker nicht erreichbar – wechsle in Offline-Modus")
self._state["print_state"] = "error"
self._state["kobra_state"] = "offline"
try:
self.client.disconnect()
except Exception:
pass
_offline = True
stop_event.wait(3.0)
# ---------------------------------------------------------------------------
# App factory + main
# ---------------------------------------------------------------------------
def build_app(bridge: KobraXBridge) -> web.Application:
app = web.Application()
r = app.router
# Moonraker API
r.add_get("/server/info", bridge.handle_server_info)
r.add_get("/printer/info", bridge.handle_printer_info)
r.add_get("/machine/system_info", bridge.handle_machine_system_info)
r.add_get("/printer/objects/list", bridge.handle_objects_list)
r.add_get("/printer/objects/query", bridge.handle_objects_query)
r.add_get("/printer/objects/subscribe", bridge.handle_objects_subscribe)
r.add_post("/printer/objects/subscribe", bridge.handle_objects_subscribe)
r.add_get("/server/files/list", bridge.handle_files_list)
r.add_post("/server/files/upload", bridge.handle_file_upload)
r.add_post("/printer/print/start", bridge.handle_print_start)
r.add_post("/printer/print/pause", bridge.handle_print_pause)
r.add_post("/printer/print/resume", bridge.handle_print_resume)
r.add_post("/printer/print/cancel", bridge.handle_print_cancel)
# OctoPrint compatibility (OrcaSlicer probes this + uploads here)
r.add_get("/api/version", bridge.handle_octoprint_version)
r.add_post("/api/files/local", bridge.handle_file_upload)
r.add_post("/api/files/{path:.*}", bridge.handle_file_upload)
# Moonraker database (OrcaSlicer AMS-Sync)
r.add_get("/server/database/item", bridge.handle_moonraker_database)
# New API endpoints
r.add_post("/api/light", bridge.handle_api_light)
r.add_post("/api/fan", bridge.handle_api_fan)
r.add_post("/api/connect", bridge.handle_api_connect)
r.add_post("/api/disconnect", bridge.handle_api_disconnect)
r.add_post("/api/speed", bridge.handle_api_speed)
r.add_post("/api/ams/feed", bridge.handle_api_ams_feed)
r.add_post("/api/axis", bridge.handle_api_axis)
r.add_post("/api/temperature", bridge.handle_api_temperature)
r.add_get("/api/camera", bridge.handle_api_camera)
r.add_get("/api/camera/stream", bridge.handle_camera_stream)
r.add_post("/api/camera/start", bridge.handle_api_camera_start)
r.add_post("/api/camera/stop", bridge.handle_api_camera_stop)
r.add_get("/api/state", bridge.handle_api_state)
r.add_get("/api/settings", bridge.handle_api_settings_get)
r.add_post("/api/settings", bridge.handle_api_settings_post)
r.add_get("/api/update/check", bridge.handle_api_update_check)
r.add_post("/api/update/apply", bridge.handle_api_update_apply)
r.add_get("/serve/{filename}", bridge.handle_serve_file)
# Root + favicon (OrcaSlicer öffnet / in eingebettetem Browser)
r.add_get("/", bridge.handle_index)
r.add_get("/favicon.ico", bridge.handle_favicon)
# WebSocket
r.add_get("/websocket", bridge.handle_websocket)
# Catch-all: alle unbekannten Requests loggen statt 404
r.add_route("*", "/{path:.*}", bridge.handle_catchall)
return app
async def run_bridge(args):
client = KobraXClient(
host=args.printer_ip,
port=args.mqtt_port,
username=args.username,
password=args.password,
mode_id=args.mode_id,
device_id=args.device_id,
client_id="kobrax_bridge",
)
bridge = KobraXBridge(client, args=args)
# Verbindungsversuch beim Start – bei Fehler im Offline-Modus weiterlaufen
loop = asyncio.get_event_loop()
log.info(f"Verbinde mit Drucker {args.printer_ip}:{args.mqtt_port} …")
try:
await loop.run_in_executor(None, client.connect)
log.info("MQTT verbunden")
except Exception as e:
log.warning(f"Drucker nicht erreichbar ({e}) – starte im Offline-Modus")
bridge._state["print_state"] = "error"
bridge._state["kobra_state"] = "offline"
app = build_app(bridge)
stop_event = threading.Event()
poll_thread = threading.Thread(
target=bridge._poll_loop, args=(stop_event,), daemon=True, name="poll"
)
poll_thread.start()
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, args.host, args.port)
await site.start()
log.info(f"Bridge läuft auf http://{args.host}:{args.port}")
log.info(f"OrcaSlicer → Klipper → Host: {args.host} Port: {args.port}")
log.info("Ctrl-C zum Beenden")
try:
while True:
await asyncio.sleep(3600)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
stop_event.set()
await runner.cleanup()
client.disconnect()
log.info("Bridge beendet")
def main():
parser = argparse.ArgumentParser(description="Moonraker-Bridge für Anycubic Kobra X")
parser.add_argument("--printer-ip", default=env_loader.PRINTER_IP,
help="IP-Adresse des Druckers")
parser.add_argument("--mqtt-port", type=int, default=env_loader.MQTT_PORT)
parser.add_argument("--username", default=env_loader.USERNAME)
parser.add_argument("--password", default=env_loader.PASSWORD)
parser.add_argument("--mode-id", default=env_loader.MODE_ID)
parser.add_argument("--device-id", default=env_loader.DEVICE_ID)
parser.add_argument("--host", default="0.0.0.0",
help="Bind-Adresse für den Bridge-Server")
parser.add_argument("--port", type=int, default=7125,
help="HTTP/WS-Port (Moonraker-Standard: 7125)")
args = parser.parse_args()
asyncio.run(run_bridge(args))
if __name__ == "__main__":
main()