From cb05bc3699d214f9e767a7ec700939cbc0cfcbb8 Mon Sep 17 00:00:00 2001 From: Gangoke Date: Tue, 19 May 2026 02:18:20 -1000 Subject: [PATCH] Hungry Hare adjustments, dont sync empty slots with placeholder --- kobrax_moonraker_bridge.py | 435 +++++++++++++++++-------------------- 1 file changed, 197 insertions(+), 238 deletions(-) diff --git a/kobrax_moonraker_bridge.py b/kobrax_moonraker_bridge.py index 7a2f5a6..2147440 100644 --- a/kobrax_moonraker_bridge.py +++ b/kobrax_moonraker_bridge.py @@ -863,8 +863,8 @@ class KobraXBridge: box_id = int(slot.get("box_id", -1)) if self._filament_mode == "ace_hub": - # In hub mode, print channels come from ACE units only. - return box_id >= 0 + # In hub mode, toolhead channels (0..2) and ACE channels are both printable. + return box_id == -1 or box_id >= 0 if self._filament_mode == "ace_direct": return box_id >= 0 return box_id == -1 @@ -874,10 +874,99 @@ class KobraXBridge: loaded = [ (int(s.get("global_index", i)), s) for i, s in enumerate(self._ams_slots) - if s.get("status") == 5 + if s.get("status") == 5 and self._slot_usable_for_print(int(s.get("global_index", i))) ] return loaded + def _select_loaded_slots_for_print(self, warn_on_empty_default: bool = False) -> list[tuple[int, dict]]: + """Return loaded slots, honoring default_ams_slot when configured.""" + default_slot = getattr(self._args, "default_ams_slot", "auto") + all_loaded = self._loaded_slots_for_print() + if default_slot == "auto": + return all_loaded + + try: + slot_idx = int(default_slot) + except ValueError: + return all_loaded + + selected = [(i, s) for i, s in all_loaded if i == slot_idx] + if selected: + return selected + + if warn_on_empty_default: + log.warning(f"Standard-Slot {slot_idx} ist leer – fallback auf Auto") + return all_loaded + + @staticmethod + def _slot_color_rgba(slot: dict) -> list[int]: + color = slot.get("color", [255, 255, 255]) + if isinstance(color, list) and len(color) >= 3: + return [int(color[0]), int(color[1]), int(color[2]), 255] + return [255, 255, 255, 255] + + def _build_auto_ams_box_mapping( + self, + warn_on_empty_default: bool = False, + loaded_slots: list[tuple[int, dict]] | None = None, + ) -> list[dict]: + """Build print mapping from currently loaded slots (no explicit dialog assignments).""" + loaded = loaded_slots + if loaded is None: + loaded = self._select_loaded_slots_for_print(warn_on_empty_default=warn_on_empty_default) + return [ + { + "paint_index": pidx, + "ams_index": self._slot_to_print_ams_index(gidx), + "paint_color": [255, 255, 255, 255], + "ams_color": self._slot_color_rgba(s), + "material_type": s.get("type", "PLA"), + } + for pidx, (gidx, s) in enumerate(loaded) + ] + + def _build_assigned_ams_box_mapping(self, assignments: list) -> tuple[list[dict], int, int]: + """Build print mapping from UI filament assignments. + + Returns (mapping, unused_count, invalid_count). + """ + slot_by_global_index = { + int(s.get("global_index", i)): s + for i, s in enumerate(self._ams_slots) + } + ams_box_mapping: list[dict] = [] + unused_count = 0 + invalid_count = 0 + + for i, a in enumerate(assignments): + try: + if a.get("is_used") is False: + unused_count += 1 + continue + global_slot = int(a["slot_index"]) + except (ValueError, TypeError, KeyError): + invalid_count += 1 + continue + + if global_slot < 0: + unused_count += 1 + continue + if not self._slot_usable_for_print(global_slot): + invalid_count += 1 + continue + + slot = slot_by_global_index.get(global_slot, {}) + ams_box_mapping.append({ + # Preserve slicer paint indices (can be sparse when paint 0 is unused). + "paint_index": a.get("paint_index", i), + "ams_index": self._slot_to_print_ams_index(global_slot), + "paint_color": a.get("paint_color", [255, 255, 255, 255]), + "ams_color": self._slot_color_rgba(slot), + "material_type": slot.get("type", a.get("material", "PLA")), + }) + + return ams_box_mapping, unused_count, invalid_count + def _box_local_to_global(self, box_id: int, local_slot: int, boxes: list) -> int: """Convert (box_id, local slot) to global slot index for current topology.""" if box_id == -1: @@ -1077,63 +1166,57 @@ class KobraXBridge: } def _build_lane_data(self) -> dict: - """Baut BBL-AMS-JSON für OrcaSlicer DevFilaSystemParser::ParseV1_0.""" - slots = self._ams_slots - total = len(slots) - if total == 0: - return {"ams": [], "ams_exist_bits": "0", "tray_exist_bits": "0"} + """Build lane_data for filament sync from loaded, printable slots only. - ams_count = (total + 3) // 4 - ams_exist_bits = 0 - tray_exist_bits = 0 - ams_array = [] + Slots are compacted in sync order (installed slots first) so Orca/Happy + Hare does not infer empty gaps between tray ids. + """ + loaded_slots = self._loaded_slots_for_print() + if not loaded_slots: + return {"ams": [], "ams_exist_bits": "0", "tray_exist_bits": "0"} - for ams_id in range(ams_count): - ams_exist_bits |= (1 << ams_id) - tray_array = [] - max_slot = min(3, total - ams_id * 4 - 1) - for slot_id in range(max_slot + 1): - slot_index = ams_id * 4 + slot_id - slot = slots[slot_index] if slot_index < total else {} - occupied = slot.get("status") == 5 + ams_buckets: dict[int, list[tuple[int, dict]]] = {} + tray_exist_bits = 0 - if occupied: - tray_exist_bits |= (1 << slot_index) - color_raw = slot.get("color", [255, 255, 255]) - if isinstance(color_raw, list) and len(color_raw) >= 3: - color_hex = "{:02X}{:02X}{:02X}FF".format( - int(color_raw[0]), int(color_raw[1]), int(color_raw[2]) - ) - elif isinstance(color_raw, str) and len(color_raw) >= 6: - color_hex = color_raw[:6].upper() + "FF" - else: - color_hex = "FFFFFFFF" - material = slot.get("type", "PLA").upper() - tray_info_idx = self._TRAY_INFO_IDX.get(material, "OGFL99") - tray_array.append({ - "id": str(slot_id), - "tag_uid": "0000000000000000", - "tray_info_idx": tray_info_idx, - "tray_type": material, - "tray_color": color_hex, - }) - else: - tray_array.append({ - "id": str(slot_id), - "tag_uid": "0000000000000000", - "tray_info_idx": "", - "tray_type": "", - "tray_color": "00000000", - "tray_slot_placeholder": "1", - }) + for sync_index, (_global_index, slot) in enumerate(sorted(loaded_slots, key=lambda item: item[0])): + ams_id = sync_index // 4 + slot_id = sync_index % 4 + tray_exist_bits |= (1 << sync_index) + ams_buckets.setdefault(ams_id, []).append((slot_id, slot)) - ams_array.append({"id": str(ams_id), "info": "0002", "tray": tray_array}) + ams_exist_bits = 0 + ams_array = [] + for ams_id in sorted(ams_buckets.keys()): + ams_exist_bits |= (1 << ams_id) + tray_array = [] + for slot_id, slot in sorted(ams_buckets[ams_id], key=lambda item: item[0]): + color_raw = slot.get("color", [255, 255, 255]) + if isinstance(color_raw, list) and len(color_raw) >= 3: + color_hex = "{:02X}{:02X}{:02X}FF".format( + int(color_raw[0]), int(color_raw[1]), int(color_raw[2]) + ) + elif isinstance(color_raw, str) and len(color_raw) >= 6: + color_hex = color_raw[:6].upper() + "FF" + else: + color_hex = "FFFFFFFF" - return { - "ams": ams_array, - "ams_exist_bits": format(ams_exist_bits, "X"), - "tray_exist_bits": format(tray_exist_bits, "X"), - } + material = slot.get("type", "PLA").upper() + tray_info_idx = self._TRAY_INFO_IDX.get(material, "OGFL99") + tray_array.append({ + "id": str(slot_id), + "tag_uid": "0000000000000000", + "tray_info_idx": tray_info_idx, + "tray_type": material, + "tray_color": color_hex, + }) + + ams_array.append({"id": str(ams_id), "info": "0002", "tray": tray_array}) + + return { + "ams": ams_array, + "ams_exist_bits": format(ams_exist_bits, "X"), + "tray_exist_bits": format(tray_exist_bits, "X"), + } # ------------------------------------------------------------------------- # WebSocket push @@ -1157,40 +1240,39 @@ class KobraXBridge: self.ws_clients -= dead def _build_mmu_object(self) -> dict: - slots = self._ams_slots - if not slots: - return {} - _TEMP = {"PLA": 210, "PETG": 230, "ABS": 240, "ASA": 250, - "TPU": 220, "PA": 260, "PC": 270, "HIPS": 220} - num_gates = len(slots) - gate_status, gate_material, gate_color, gate_temperature, gate_color_rgb = [], [], [], [], [] - for slot in slots: - occupied = slot.get("status") == 5 - gate_status.append(1 if occupied else 0) - material = slot.get("type", "PLA").upper() if occupied else "" - gate_material.append(material) - c = slot.get("color", [0, 0, 0]) - if occupied: - gate_color.append("#{:02X}{:02X}{:02X}".format(*c[:3])) - gate_color_rgb.append([round(c[0]/255, 3), round(c[1]/255, 3), round(c[2]/255, 3)]) - else: - gate_color.append("") - gate_color_rgb.append([0.0, 0.0, 0.0]) - gate_temperature.append(_TEMP.get(material, 210) if occupied else 0) - return { - "num_gates": num_gates, - "enabled": True, - "gate_status": gate_status, - "gate_material": gate_material, - "gate_color": gate_color, - "gate_temperature": gate_temperature, - "gate_color_rgb": gate_color_rgb, - "gate_filament_name": [""] * num_gates, - "gate_spool_id": [-1] * num_gates, - "ttg_map": list(range(num_gates)), - "tool": self._ams_loaded_slot, - "gate": self._ams_loaded_slot, - } + loaded_slots = sorted(self._loaded_slots_for_print(), key=lambda item: item[0]) + if not loaded_slots: + return {} + + _TEMP = {"PLA": 210, "PETG": 230, "ABS": 240, "ASA": 250, + "TPU": 220, "PA": 260, "PC": 270, "HIPS": 220} + num_gates = len(loaded_slots) + gate_status, gate_material, gate_color, gate_temperature, gate_color_rgb = [], [], [], [], [] + for _global_index, slot in loaded_slots: + gate_status.append(1) + material = slot.get("type", "PLA").upper() + gate_material.append(material) + c = slot.get("color", [0, 0, 0]) + gate_color.append("#{:02X}{:02X}{:02X}".format(*c[:3])) + gate_color_rgb.append([round(c[0]/255, 3), round(c[1]/255, 3), round(c[2]/255, 3)]) + gate_temperature.append(_TEMP.get(material, 210)) + + loaded_index_map = {global_index: idx for idx, (global_index, _) in enumerate(loaded_slots)} + active_gate = loaded_index_map.get(int(self._ams_loaded_slot), -1) + return { + "num_gates": num_gates, + "enabled": True, + "gate_status": gate_status, + "gate_material": gate_material, + "gate_color": gate_color, + "gate_temperature": gate_temperature, + "gate_color_rgb": gate_color_rgb, + "gate_filament_name": [""] * num_gates, + "gate_spool_id": [-1] * num_gates, + "ttg_map": list(range(num_gates)), + "tool": active_gate, + "gate": active_gate, + } def _build_printer_objects(self) -> dict: s = self._state @@ -1447,63 +1529,16 @@ class KobraXBridge: excluded_objects = [] if assignments: - ams_box_mapping = [] - dropped = 0 - for i, a in enumerate(assignments): - try: - if a.get("is_used") is False: - dropped += 1 - continue - global_slot = int(a["slot_index"]) - except (ValueError, TypeError, KeyError): - dropped += 1 - continue - # Skip unused paints (slot_index < 0 means no assignment) - if global_slot < 0: - dropped += 1 - continue - if not self._slot_usable_for_print(global_slot): - dropped += 1 - continue - slot = next((s for s in self._ams_slots if int(s.get("global_index", -1)) == global_slot), {}) - slot_color = slot.get("color", [255, 255, 255]) - if not (isinstance(slot_color, list) and len(slot_color) >= 3): - slot_color = [255, 255, 255] - slot_rgba = [int(slot_color[0]), int(slot_color[1]), int(slot_color[2]), 255] - ams_box_mapping.append({ - # Preserve slicer paint indices (can be sparse when paint 0 is unused). - "paint_index": a.get("paint_index", i), - "ams_index": self._slot_to_print_ams_index(global_slot), - "paint_color": a.get("paint_color", [255, 255, 255, 255]), - "ams_color": slot_rgba, - "material_type": slot.get("type", a.get("material", "PLA")), - }) - if dropped: - log.warning(f"Ignored {dropped} unused or unusable filament assignments for mode={self._filament_mode}") + ams_box_mapping, unused_count, invalid_count = self._build_assigned_ams_box_mapping(assignments) + if unused_count: + log.debug(f"Skipped {unused_count} unused filament assignment(s) for mode={self._filament_mode}") + if invalid_count: + log.warning(f"Ignored {invalid_count} unusable filament assignment(s) for mode={self._filament_mode}") if not ams_box_mapping: return self._json_cors({"error": "no usable filament assignments for current filament mode"}, status=400) else: # Kein Dialog → alle belegten Slots wie bei normalem Upload-Druck - default_slot = getattr(self._args, "default_ams_slot", "auto") - all_loaded = self._loaded_slots_for_print() - if default_slot != "auto": - try: - slot_idx = int(default_slot) - loaded = [(i, s) for i, s in all_loaded if i == slot_idx] or all_loaded - except ValueError: - loaded = all_loaded - else: - loaded = all_loaded - ams_box_mapping = [ - { - "paint_index": pidx, - "ams_index": self._slot_to_print_ams_index(gidx), - "paint_color": [255, 255, 255, 255], - "ams_color": [int(c[0]), int(c[1]), int(c[2]), 255] if (isinstance(c := s.get("color", [255, 255, 255]), list) and len(c) >= 3) else [255, 255, 255, 255], - "material_type": s.get("type", "PLA"), - } - for pidx, (gidx, s) in enumerate(loaded) - ] + ams_box_mapping = self._build_auto_ams_box_mapping() use_ams = len(ams_box_mapping) > 0 auto_leveling = getattr(self._args, "auto_leveling", 1) @@ -1538,12 +1573,6 @@ class KobraXBridge: }, } - log.info( - f"print mapping mode={self._filament_mode} " - f"assignments={assignments if assignments else 'auto'} " - f"ams_box_mapping={ams_box_mapping}" - ) - log.info(f"KX-Store Druckstart: {filename} ams={len(ams_box_mapping)} slots assignments={bool(assignments)} excluded={len(excluded_objects)}") loop = asyncio.get_event_loop() result = await loop.run_in_executor( @@ -1619,12 +1648,14 @@ class KobraXBridge: async def handle_objects_query(self, request): objects = self._build_printer_objects() - # filter by requested objects if specified - requested = dict(request.rel_url.query) - if requested: - filtered = {k: objects[k] for k in requested if k in objects} - else: - filtered = objects + requested = [] + query = request.rel_url.query + if "objects" in query: + requested = [x.strip() for x in str(query.get("objects", "")).split(",") if x.strip()] + elif query: + requested = [k for k in query.keys() if k] + + filtered = {k: objects[k] for k in requested if k in objects} if requested else objects return web.json_response({"result": {"status": filtered, "eventtime": time.time()}}) async def handle_objects_list(self, request): @@ -1766,31 +1797,10 @@ class KobraXBridge: def _start_print(self, filename: str, url: str = "", md5: str = "", filesize: int = 0): self._state["file_ready"] = "" - default_slot = getattr(self._args, "default_ams_slot", "auto") - all_loaded = self._loaded_slots_for_print() - if default_slot != "auto": - try: - slot_idx = int(default_slot) - loaded = [(i, s) for i, s in all_loaded if i == slot_idx] - if not loaded: - log.warning(f"Standard-Slot {slot_idx} ist leer – fallback auf Auto") - loaded = all_loaded - except ValueError: - loaded = all_loaded - else: - loaded = all_loaded + loaded = self._select_loaded_slots_for_print(warn_on_empty_default=True) use_ams = len(loaded) > 0 - ams_box_mapping = [ - { - "paint_index": pidx, - "ams_index": self._slot_to_print_ams_index(gidx), - "paint_color": [255, 255, 255, 255], - "ams_color": [int(c[0]), int(c[1]), int(c[2]), 255] if (isinstance(c := s.get("color", [255, 255, 255]), list) and len(c) >= 3) else [255, 255, 255, 255], - "material_type": s.get("type", "PLA"), - } - for pidx, (gidx, s) in enumerate(loaded) - ] - log.info(f"AMS-Slots: {len(loaded)}/{len(self._ams_slots)} belegt → {[i for i,_ in loaded]}") + ams_box_mapping = self._build_auto_ams_box_mapping(loaded_slots=loaded) + log.debug(f"AMS-Slots: {len(loaded)}/{len(self._ams_slots)} belegt → {[i for i, _ in loaded]}") auto_leveling = getattr(self._args, "auto_leveling", 1) payload = { "taskid": "-1", @@ -1816,11 +1826,7 @@ class KobraXBridge: "model_objects_skip_parts": [], }, } - log.info( - f"print mapping mode={self._filament_mode} auto_start=1 " - f"ams_box_mapping={ams_box_mapping}" - ) - log.info(f"print/start → {filename} url={url} ams={len(self._ams_slots)} slots") + log.info(f"print/start → {filename} url={url} ams={len(ams_box_mapping)} slots mode={self._filament_mode}") result = self.client.publish("print", "start", payload, timeout=15.0) if result: log.info(f"Druckstart bestätigt: state={result.get('state')}") @@ -1847,62 +1853,16 @@ class KobraXBridge: if not isinstance(excluded_objects, list): excluded_objects = [] if filament_assignments is not None: - ams_box_mapping = [] - dropped = 0 - for i, a in enumerate(filament_assignments): - if a.get("is_used") is False: - dropped += 1 - continue - try: - global_slot = int(a["slot_index"]) - except (ValueError, TypeError, KeyError): - dropped += 1 - continue - # Skip unused paints (slot_index < 0 means no assignment) - if global_slot < 0: - dropped += 1 - continue - if not self._slot_usable_for_print(global_slot): - dropped += 1 - continue - slot = next((s for s in self._ams_slots if int(s.get("global_index", -1)) == global_slot), {}) - slot_color = slot.get("color", [255, 255, 255]) - if not (isinstance(slot_color, list) and len(slot_color) >= 3): - slot_color = [255, 255, 255] - slot_rgba = [int(slot_color[0]), int(slot_color[1]), int(slot_color[2]), 255] - ams_box_mapping.append({ - "paint_index": a.get("paint_index", i), - "ams_index": self._slot_to_print_ams_index(global_slot), - "paint_color": a.get("paint_color", [255, 255, 255, 255]), - "ams_color": slot_rgba, - "material_type": slot.get("type", a.get("material", "PLA")), - }) - if dropped: - log.warning(f"Ignored {dropped} unused or unusable filament assignments for mode={self._filament_mode}") + ams_box_mapping, unused_count, invalid_count = self._build_assigned_ams_box_mapping(filament_assignments) + if unused_count: + log.debug(f"Skipped {unused_count} unused filament assignment(s) for mode={self._filament_mode}") + if invalid_count: + log.warning(f"Ignored {invalid_count} unusable filament assignment(s) for mode={self._filament_mode}") if not ams_box_mapping: return web.json_response({"error": "no usable filament assignments for current filament mode"}, status=400) else: # AMS-Mapping aus gecachtem State — leere Slots (status != 5) überspringen - default_slot = getattr(self._args, "default_ams_slot", "auto") - all_loaded = self._loaded_slots_for_print() - if default_slot != "auto": - try: - slot_idx = int(default_slot) - loaded = [(i, s) for i, s in all_loaded if i == slot_idx] or all_loaded - except ValueError: - loaded = all_loaded - else: - loaded = all_loaded - ams_box_mapping = [ - { - "paint_index": pidx, - "ams_index": self._slot_to_print_ams_index(gidx), - "paint_color": [255, 255, 255, 255], - "ams_color": [int(c[0]), int(c[1]), int(c[2]), 255] if (isinstance(c := s.get("color", [255, 255, 255]), list) and len(c) >= 3) else [255, 255, 255, 255], - "material_type": s.get("type", "PLA"), - } - for pidx, (gidx, s) in enumerate(loaded) - ] + ams_box_mapping = self._build_auto_ams_box_mapping() use_ams = len(ams_box_mapping) > 0 auto_leveling = getattr(self._args, "auto_leveling", 1) @@ -1936,9 +1896,8 @@ class KobraXBridge: } log.info( - f"print mapping mode={self._filament_mode} print_start_api=1 " - f"assignments={filament_assignments if filament_assignments is not None else 'auto'} " - f"ams_box_mapping={ams_box_mapping}" + f"print/start api=1 mode={self._filament_mode} " + f"ams={len(ams_box_mapping)} slots assignments={filament_assignments is not None}" ) loop = asyncio.get_event_loop()