15 Commits

Author SHA1 Message Date
Gangoke
b2f153ae26 fix: improve error handling for ACE commands and add optimistic state updates 2026-05-21 22:48:03 -10:00
copilot-swe-agent[bot]
cf3b4c1ca1 fix: surface ACE endpoint error payloads without requiring result field
Agent-Logs-Url: https://github.com/gangoke/kobrax-lan-hass-component/sessions/d19d21f5-b5a1-4db3-b891-dd8cf47f3b54

Co-authored-by: gangoke <77847204+gangoke@users.noreply.github.com>
2026-05-21 12:17:27 +00:00
copilot-swe-agent[bot]
8370153dd2 fix: support ACE auto-fill off toggle and tolerate ACE endpoint responses
Agent-Logs-Url: https://github.com/gangoke/kobrax-lan-hass-component/sessions/d19d21f5-b5a1-4db3-b891-dd8cf47f3b54

Co-authored-by: gangoke <77847204+gangoke@users.noreply.github.com>
2026-05-21 12:15:41 +00:00
copilot-swe-agent[bot]
cba7f9b707 Initial plan 2026-05-21 12:12:23 +00:00
gangoke
4e5aeb54a8 Merge pull request 'ACE entity fixes' (#4) from v0.9.13 into main
Reviewed-on: https://gitea.gangoke.app/gangoke/kobrax-lan-hass-component/pulls/4
2026-05-21 11:06:26 +00:00
Gangoke
2623f2739a ACE entity fixes 2026-05-21 01:05:51 -10:00
gangoke
ec1fbbc520 Merge pull request 'ACE2 Support + more' (#3) from v0.9.13 into main
Reviewed-on: https://gitea.gangoke.app/gangoke/kobrax-lan-hass-component/pulls/3
2026-05-21 10:44:15 +00:00
Gangoke
3fb047708e ace2 dryer chnage to toggle 2026-05-21 00:39:34 -10:00
Gangoke
13ce4d48f3 ACE2 Support + more 2026-05-21 00:31:36 -10:00
gangoke
96ccfb98ff Update README.md 2026-05-18 07:45:06 +00:00
gangoke
2cc51085e4 Update README.md 2026-05-18 05:19:59 +00:00
Gangoke
b03611913a Update README.md to enhance project documentation and clarify available entities 2026-05-17 19:18:46 -10:00
gangoke
9603d449e6 Update README.md 2026-05-17 23:14:11 +00:00
gangoke
3f3e4f534d Merge pull request 'filament entities' (#1) from camera-rework into main
Reviewed-on: https://gitea.gangoke.app/gangoke/kobrax-lan-hass-component/pulls/1
2026-05-17 11:01:53 +00:00
Gangoke
45591f41a4 Enhance camera functionality: add streaming support and snapshot fallback 2026-05-17 00:40:23 -10:00
17 changed files with 971 additions and 59 deletions

View File

@@ -1,40 +1,61 @@
# Kobra X Home Assistant Component
# Kobra X LAN for Home Assistant
Home Assistant HACS integration for controlling and monitoring an Anycubic Kobra X through KX-Bridge.
Home Assistant integration for monitoring and controlling an Anycubic Kobra X through KX-Bridge.
This project was coded with AI assistance and should be reviewed before use in production.
Architecture:
- printer <-> KX-Bridge-Release <-> this integration <-> Home Assistant
- printer <-> [KX-Bridge](https://gitea.it-drui.de/viewit/KX-Bridge-Release) <-> this integration <-> Home Assistant
## Features
## Requirements
- Auto-discovered status from KX-Bridge `/api/state`
- Core printer sensors (state, temperatures, progress, file, layer/time data)
- Light control
- Print speed mode selection
- Printer action buttons (pause, resume, cancel)
- Camera snapshot entity using `/api/camera/snapshot`
- Running and reachable [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release)
- Bridge endpoint accessible from Home Assistant at `http://<bridge-host>:7125`
## Prerequisites
## Installation
1. KX-Bridge must be running and reachable from Home Assistant.
2. Verify KX-Bridge is accessible at `http://<bridge-host>:7125`.
### Option 1: HACS
## Installation (HACS)
1. Add this repository as a custom repository in HACS with category `Integration`.
2. Install the integration.
1. In HACS, add this repository as a custom repository (category: Integration):
`https://github.com/gangoke/kobrax-lan-hass-component`
2. Install Kobra X LAN from HACS.
3. Restart Home Assistant.
4. Add integration `Kobra X` from Settings -> Devices & Services.
4. Go to Settings -> Devices & Services -> Add Integration.
5. Search for Kobra X LAN.
### Option 2: Manual (local custom_components)
1. Copy the `kobrax_lan` folder into your Home Assistant `custom_components` directory:
`<config>/custom_components/kobrax_lan`
3. Restart Home Assistant.
4. Add Kobra X LAN from Settings -> Devices & Services.
## Configuration
The config flow asks for:
- Host: KX-Bridge host and port (example: `192.168.1.50:7125`)
- Printer name: Friendly display name
- Printer name: Friendly display name in Home Assistant
## Entity Overview
| Platform | Key Entities |
| --- | --- |
| Binary Sensor | Online, Printing, Light State |
| Sensor | State, Print State, Progress, Hotend Temperature, Target Hotend Temperature, Bed Temperature, Target Bed Temperature, Filename, Current Layer, Total Layers, Remaining Time, Print Duration, Skip Object Count, Skipped Object Count, Filament Mode, ACE Unit Count, Bridge Version, Latest Available Version, Slot 1..Slot 19, ACE 1..4 Dryer Status, ACE 1..4 Dryer Humidity, ACE 1..4 Dryer Current Temperature, ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Remaining Time |
| Button | Pause Print, Resume Print, Cancel Print, Connect Bridge, Disconnect Bridge, Refresh Skip State, Apply Update (KX-Bridge) |
| Switch | ACE 1..4 Auto Fill, ACE 1..4 Dryer |
| Number | ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Duration |
| Select | Print speed |
| Light | Printer light |
| Camera | Printer camera |
| Image | G-code thumbnail |
Slot and ACE entities are pre-created and automatically enabled/disabled based on detected slot mode and ACE unit count from KX-Bridge.
## Notes
- This integration talks to KX-Bridge HTTP endpoints and does not connect directly to the printer.
- Keep KX-Bridge and Home Assistant on the same trusted network.
- This integration communicates with KX-Bridge HTTP endpoints and does not connect directly to the printer.
- Keep KX-Bridge and Home Assistant on a trusted local network.
- Native WebRTC is not implemented. For WebRTC in Home Assistant, point `go2rtc` (or another WebRTC-capable add-on) to the camera RTSP source.

View File

@@ -34,6 +34,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"coordinator": coordinator,
"api": api,
"entry": entry,
"ace_dry_config": {
0: {
"target_temp": int((coordinator.data or {}).get("ace_drying", {}).get("target_temp", 45) or 45),
"duration": int((coordinator.data or {}).get("ace_drying", {}).get("duration", 240) or 240),
},
1: {
"target_temp": 45,
"duration": 240,
},
2: {
"target_temp": 45,
"duration": 240,
},
3: {
"target_temp": 45,
"duration": 240,
},
},
}
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

View File

@@ -1,10 +1,14 @@
from __future__ import annotations
import logging
from typing import Any
import aiohttp
_LOGGER = logging.getLogger(__name__)
class KobraXApiError(Exception):
"""Raised when communication with KX-Bridge fails."""
@@ -17,6 +21,9 @@ class KobraXApiClient:
def _url(self, path: str) -> str:
return f"{self._base_url}{path}"
def camera_stream_proxy_url(self) -> str:
return self._url("/api/camera/stream")
async def _get_json(self, path: str) -> dict[str, Any]:
try:
async with self._session.get(self._url(path)) as response:
@@ -46,6 +53,70 @@ class KobraXApiClient:
return result
raise KobraXApiError("Unexpected response for /kx/files")
async def async_get_file_objects(self, file_id: str) -> dict[str, Any]:
data = await self._get_json(f"/kx/files/{file_id}/objects")
result = data.get("result", {})
if isinstance(result, dict):
return result
raise KobraXApiError("Unexpected response for /kx/files/{id}/objects")
async def async_skip_objects(self, names: list[str]) -> dict[str, Any]:
data = await self._post_json("/kx/skip", {"names": names})
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /kx/skip")
return data
async def async_skip_query(self) -> dict[str, Any]:
data = await self._post_json("/kx/skip/query", {})
result = data.get("result", {})
if isinstance(result, dict):
return result
raise KobraXApiError("Unexpected response for /kx/skip/query")
async def async_get_skip_state(self) -> dict[str, Any]:
data = await self._get_json("/kx/skip/state")
result = data.get("result", {})
if isinstance(result, dict):
return result
raise KobraXApiError("Unexpected response for /kx/skip/state")
async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]:
data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on})
if data.get("error") not in (None, ""):
raise KobraXApiError(str(data["error"]))
return data
async def async_set_ace_dry(
self,
action: str,
target_temp: int | None = None,
duration: int | None = None,
ace_id: int | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"action": action}
if target_temp is not None:
payload["target_temp"] = int(target_temp)
if duration is not None:
payload["duration"] = int(duration)
if ace_id is not None:
payload["ace_id"] = int(ace_id)
try:
data = await self._post_json("/api/ace/dry", payload)
except KobraXApiError as err:
# Some bridge versions can return a false 502 while setDry is
# still applied successfully on the printer.
msg = str(err)
if "502" in msg and "/api/ace/dry" in msg:
_LOGGER.warning("Ignoring bridge 502 for /api/ace/dry because command may already be applied: %s", msg)
return {"result": "ok", "warning": "ignored_502"}
raise
if data.get("error") not in (None, ""):
raise KobraXApiError(str(data["error"]))
return data
async def async_pause_print(self) -> None:
await self._post_json("/printer/print/pause", {})
@@ -76,6 +147,31 @@ class KobraXApiClient:
async def async_disconnect(self) -> None:
await self._post_json("/api/disconnect", {})
async def async_restart_bridge(self) -> None:
await self._post_json("/api/restart", {})
async def async_start_camera(self) -> None:
await self._post_json("/api/camera/start", {})
async def async_stop_camera(self) -> None:
await self._post_json("/api/camera/stop", {})
async def async_check_updates(self) -> dict[str, Any]:
return await self._get_json("/api/update/check")
async def async_apply_update(self, tag: str, download_url: str) -> dict[str, Any]:
return await self._post_json(
"/api/update/apply",
{"tag": tag, "download_url": download_url},
)
async def async_get_camera_url(self) -> str | None:
data = await self._get_json("/api/camera")
url = data.get("url")
if url is None or isinstance(url, str):
return url
raise KobraXApiError("Unexpected response for /api/camera")
async def async_get_camera_snapshot(self) -> bytes:
try:
async with self._session.get(self._url("/api/camera/snapshot")) as response:

View File

@@ -7,6 +7,7 @@ from homeassistant.components.binary_sensor import (
BinarySensorEntity,
BinarySensorEntityDescription,
)
from homeassistant.helpers.entity import EntityCategory
from .const import DOMAIN
from .entity import KobraXEntity
@@ -23,6 +24,7 @@ BINARY_SENSORS: tuple[KobraXBinaryDescription, ...] = (
name="Online",
value_key="kobra_state",
device_class=BinarySensorDeviceClass.CONNECTIVITY,
entity_category=EntityCategory.DIAGNOSTIC,
),
KobraXBinaryDescription(
key="printing",

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

View File

@@ -4,6 +4,7 @@ from dataclasses import dataclass
from homeassistant.components.button import ButtonEntity, ButtonEntityDescription
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity import EntityCategory
from .api import KobraXApiError
from .const import DOMAIN
@@ -13,6 +14,7 @@ from .entity import KobraXEntity
@dataclass(frozen=True, kw_only=True)
class KobraXButtonDescription(ButtonEntityDescription):
action: str
ace_id: int | None = None # None for non-ACE buttons, 0-3 for ACE buttons
BUTTONS: tuple[KobraXButtonDescription, ...] = (
@@ -39,6 +41,7 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
name="Connect Bridge",
icon="mdi:lan-connect",
action="connect",
entity_category=EntityCategory.CONFIG,
entity_registry_enabled_default=False,
),
KobraXButtonDescription(
@@ -46,7 +49,29 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
name="Disconnect Bridge",
icon="mdi:lan-disconnect",
action="disconnect",
entity_category=EntityCategory.CONFIG,
entity_registry_enabled_default=False,
),
KobraXButtonDescription(
key="restart_bridge",
name="Restart (KX-Bridge)",
icon="mdi:restart",
action="restart",
entity_category=EntityCategory.CONFIG,
),
KobraXButtonDescription(
key="refresh_skip_state",
name="Refresh Skip State",
icon="mdi:refresh",
action="skip_query",
entity_registry_enabled_default=False,
),
KobraXButtonDescription(
key="apply_update",
name="Apply Update (KX-Bridge)",
icon="mdi:download-circle-outline",
action="apply_update",
entity_category=EntityCategory.CONFIG,
),
)
@@ -58,6 +83,25 @@ class KobraXActionButton(KobraXEntity, ButtonEntity):
super().__init__(coordinator, entry, description.key, description.name)
self.entity_description = description
@property
def available(self) -> bool:
if self.entity_description.action != "apply_update":
return super().available
update_info = self.state_data.get("update_info")
if not isinstance(update_info, dict):
return False
current = str(update_info.get("current") or "").strip()
latest = str(update_info.get("latest") or "").strip()
if current and latest and current == latest:
return False
if update_info.get("update_available") is False:
return False
return super().available
async def async_press(self) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
@@ -71,16 +115,25 @@ class KobraXActionButton(KobraXEntity, ButtonEntity):
await api.async_connect()
elif self.entity_description.action == "disconnect":
await api.async_disconnect()
elif self.entity_description.action == "restart":
await api.async_restart_bridge()
elif self.entity_description.action == "skip_query":
await api.async_skip_query()
elif self.entity_description.action == "apply_update":
await self.coordinator.async_apply_update()
await self.coordinator.async_request_refresh()
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
except Exception as err:
raise ServiceValidationError(str(err)) from err
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
async_add_entities(
[
KobraXActionButton(coordinator, entry, description)
for description in BUTTONS
]
)
entities = [
KobraXActionButton(coordinator, entry, description)
for description in BUTTONS
]
async_add_entities(entities)

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from homeassistant.components.camera import Camera
from homeassistant.components.camera import Camera, CameraEntityFeature
from .api import KobraXApiError
from .const import DOMAIN
@@ -8,6 +8,9 @@ from .entity import KobraXEntity
class KobraXCamera(KobraXEntity, Camera):
_attr_supported_features = CameraEntityFeature.STREAM
_attr_use_stream_for_stills = True
def __init__(self, coordinator, entry, key: str, name: str) -> None:
KobraXEntity.__init__(self, coordinator, entry, key, name)
Camera.__init__(self)
@@ -16,6 +19,34 @@ class KobraXCamera(KobraXEntity, Camera):
def is_streaming(self) -> bool:
return bool(self.state_data.get("camera_url"))
@property
def extra_state_attributes(self) -> dict[str, str]:
camera_url = self.state_data.get("camera_url")
attrs = {
"camera_mjpeg_proxy_url": self.hass.data[DOMAIN][self._entry.entry_id]["api"].camera_stream_proxy_url(),
}
if isinstance(camera_url, str) and camera_url:
attrs["camera_rtsp_url"] = camera_url
return attrs
async def stream_source(self) -> str | None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_start_camera()
except KobraXApiError:
pass
camera_url = self.state_data.get("camera_url")
if isinstance(camera_url, str) and camera_url:
return camera_url
try:
camera_url = await api.async_get_camera_url()
except KobraXApiError:
return api.camera_stream_proxy_url()
return camera_url or api.camera_stream_proxy_url()
async def async_camera_image(self, width: int | None = None, height: int | None = None) -> bytes | None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:

View File

@@ -10,7 +10,8 @@ CONF_PRINTER_NAME = "printer_name"
DEFAULT_HOST = "localhost:7125"
DEFAULT_PRINTER_NAME = "Anycubic Kobra X"
UPDATE_INTERVAL = timedelta(seconds=5)
UPDATE_INTERVAL = timedelta(seconds=3)
UPDATE_CHECK_INTERVAL = timedelta(hours=1)
PLATFORMS = [
"sensor",
@@ -18,6 +19,8 @@ PLATFORMS = [
"light",
"select",
"button",
"switch",
"number",
"camera",
"image",
]

View File

@@ -1,12 +1,13 @@
from __future__ import annotations
import time
from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .api import KobraXApiClient, KobraXApiError
from .const import DOMAIN, UPDATE_INTERVAL
from .const import DOMAIN, UPDATE_CHECK_INTERVAL, UPDATE_INTERVAL
class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
@@ -18,9 +19,60 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
update_interval=UPDATE_INTERVAL,
)
self.api = api
self._update_info: dict[str, Any] = {}
self._next_update_check_monotonic = 0.0
async def _async_update_data(self) -> dict[str, Any]:
try:
return await self.api.async_get_state()
state = await self.api.async_get_state()
now = time.monotonic()
if now >= self._next_update_check_monotonic:
try:
self._update_info = await self.api.async_check_updates()
except KobraXApiError:
# Keep integration polling resilient if update service is temporarily unavailable.
pass
self._next_update_check_monotonic = now + UPDATE_CHECK_INTERVAL.total_seconds()
if self._update_info:
state["update_info"] = self._update_info
try:
skip_state = await self.api.async_get_skip_state()
state["skip_state"] = skip_state
except KobraXApiError:
# Skip endpoints are only available on newer bridge versions.
pass
return state
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err
async def async_check_updates(self) -> dict[str, Any]:
try:
update_info = await self.api.async_check_updates()
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err
self._update_info = update_info
merged = dict(self.data or {})
merged["update_info"] = update_info
self.async_set_updated_data(merged)
return update_info
async def async_apply_update(self) -> dict[str, Any]:
update_info = self._update_info or await self.async_check_updates()
tag = str(update_info.get("tag") or "").strip()
download_url = str(update_info.get("download_url") or "").strip()
if not tag or not download_url:
raise UpdateFailed("Missing tag or download URL from update check")
try:
result = await self.api.async_apply_update(tag=tag, download_url=download_url)
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err
self._update_info = {**update_info, "last_apply_result": result}
merged = dict(self.data or {})
merged["update_info"] = self._update_info
self.async_set_updated_data(merged)
return result

View File

@@ -5,8 +5,9 @@
"@Gangoke"
],
"config_flow": true,
"homeassistant": "2026.3.0",
"documentation": "https://github.com/gangoke/kobrax-lan-hass-component",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/gangoke/kobrax-lan-hass-component/issues",
"version": "0.1.0"
"version": "0.2.0"
}

View File

@@ -0,0 +1,93 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.number import NumberEntity
from .const import DOMAIN
from .entity import KobraXEntity
def _minutes_to_hhmmss(minutes: int | float) -> str:
"""Convert minutes to HH:mm:ss format."""
total_seconds = int(minutes * 60)
hours = total_seconds // 3600
remaining_seconds = total_seconds % 3600
mins = remaining_seconds // 60
secs = remaining_seconds % 60
return f"{hours:02d}:{mins:02d}:{secs:02d}"
class KobraXAceDryConfigNumber(KobraXEntity, NumberEntity):
def __init__(
self,
coordinator,
entry,
ace_id: int,
config_type: str, # "target_temp" or "duration"
) -> None:
if config_type == "target_temp":
unique_key = f"ace_{ace_id}_dry_target_temp"
name = f"ACE {ace_id + 1} Dryer Target Temperature"
min_val, max_val, step_val = 30, 80, 1
unit = "°C"
icon = "mdi:thermometer"
else: # duration
unique_key = f"ace_{ace_id}_dry_duration"
name = f"ACE {ace_id + 1} Dryer Duration"
min_val, max_val, step_val = 10, 1440, 1
unit = "min"
icon = "mdi:timer-cog-outline"
super().__init__(coordinator, entry, unique_key, name)
self._ace_id = ace_id
self._config_type = config_type
self._attr_native_min_value = min_val
self._attr_native_max_value = max_val
self._attr_native_step = step_val
self._attr_native_unit_of_measurement = unit
self._attr_mode = "box"
self._attr_icon = icon
@property
def native_value(self) -> float:
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
ace_cfg = cfg.get(self._ace_id) or {}
key = "target_temp" if self._config_type == "target_temp" else "duration"
if self._config_type == "target_temp":
return float(ace_cfg.get(key, 45))
else:
return float(ace_cfg.get(key, 240))
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
if self._config_type != "duration":
return None
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
ace_cfg = cfg.get(self._ace_id) or {}
duration_minutes = ace_cfg.get("duration", 240)
return {"formatted_duration": _minutes_to_hhmmss(duration_minutes)}
async def async_set_native_value(self, value: float) -> None:
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
if self._ace_id not in cfg:
cfg[self._ace_id] = {}
key = "target_temp" if self._config_type == "target_temp" else "duration"
cfg[self._ace_id][key] = int(round(value))
self.async_write_ha_state()
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
# Initialize ace_dry_config structure if not present
if "ace_dry_config" not in hass.data[DOMAIN][entry.entry_id]:
hass.data[DOMAIN][entry.entry_id]["ace_dry_config"] = {}
# Pre-create all 8 numbers (target_temp + duration for each of 4 ACE units)
entities = []
for ace_id in range(4):
entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "target_temp"))
entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "duration"))
async_add_entities(entities)

View File

@@ -2,14 +2,45 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from urllib.parse import quote
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity, SensorEntityDescription
from homeassistant.const import PERCENTAGE, UnitOfTemperature
from homeassistant.core import callback
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_registry import RegistryEntryDisabler
from .const import DOMAIN
from .entity import KobraXEntity
MAX_FILAMENT_SLOTS = 19
TOOLHEAD_SLOT_LIMIT = 4
ACE_DIRECT_SLOT_LIMIT = 4
MAX_ACE_UNITS = 4
def _detected_slot_limit(state_data: dict[str, Any]) -> int:
mode = str(state_data.get("filament_mode") or "toolhead").lower()
if mode == "toolhead":
return TOOLHEAD_SLOT_LIMIT
if mode == "ace_direct":
return ACE_DIRECT_SLOT_LIMIT
slots = state_data.get("ams_slots") or []
if isinstance(slots, list) and slots:
return min(len(slots), MAX_FILAMENT_SLOTS)
return MAX_FILAMENT_SLOTS
def _detected_ace_unit_count(state_data: dict[str, Any]) -> int:
units = state_data.get("ace_units") or []
if isinstance(units, list):
return min(len(units), MAX_ACE_UNITS)
return 0
@dataclass(frozen=True, kw_only=True)
class KobraXSensorDescription(SensorEntityDescription):
value_key: str
@@ -97,6 +128,42 @@ SENSORS: tuple[KobraXSensorDescription, ...] = (
value_key="print_duration",
icon="mdi:timer-outline",
),
KobraXSensorDescription(
key="skip_object_count",
name="Skip Object Count",
value_key="skip_object_count",
icon="mdi:vector-polygon",
),
KobraXSensorDescription(
key="skipped_object_count",
name="Skipped Object Count",
value_key="skipped_object_count",
icon="mdi:content-cut",
),
KobraXSensorDescription(
key="filament_mode",
name="Filament Mode",
value_key="filament_mode",
icon="mdi:shape-outline",
),
KobraXSensorDescription(
key="ace_unit_count",
name="ACE Unit Count",
value_key="ace_unit_count",
icon="mdi:package-variant",
),
KobraXSensorDescription(
key="bridge_version",
name="Bridge Version",
value_key="version",
icon="mdi:source-branch",
),
KobraXSensorDescription(
key="latest_available_version",
name="Latest Available Version",
value_key="latest_available_version",
icon="mdi:cloud-download-outline",
),
)
@@ -106,31 +173,183 @@ class KobraXSensor(KobraXEntity, SensorEntity):
def __init__(self, coordinator, entry, description: KobraXSensorDescription) -> None:
super().__init__(coordinator, entry, description.key, description.name)
self.entity_description = description
if description.value_key in ("version", "latest_available_version"):
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@staticmethod
def _seconds_to_hhmmss(seconds: int) -> str:
hours = seconds // 3600
remaining_seconds = seconds % 3600
minutes = remaining_seconds // 60
secs = remaining_seconds % 60
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
@property
def native_value(self) -> Any:
if self.entity_description.value_key == "filament_mode":
return self.state_data.get("filament_mode")
if self.entity_description.value_key == "ace_unit_count":
units = self.state_data.get("ace_units")
return len(units) if isinstance(units, list) else 0
if self.entity_description.value_key == "latest_available_version":
update_info = self.state_data.get("update_info") or {}
if isinstance(update_info, dict):
return update_info.get("latest")
return None
if self.entity_description.value_key == "skip_object_count":
skip_state = self.state_data.get("skip_state") or {}
objects = skip_state.get("objects") if isinstance(skip_state, dict) else []
return len(objects) if isinstance(objects, list) else 0
if self.entity_description.value_key == "skipped_object_count":
skip_state = self.state_data.get("skip_state") or {}
skipped = skip_state.get("skipped") if isinstance(skip_state, dict) else []
return len(skipped) if isinstance(skipped, list) else 0
value = self.state_data.get(self.entity_description.value_key)
if self.entity_description.value_key == "progress" and value is not None:
return round(float(value) * 100, 1)
if self.entity_description.value_key in ("remain_time", "print_duration") and value is not None:
try:
return self._seconds_to_hhmmss(int(float(value)))
except (TypeError, ValueError):
return None
return value
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
if self.entity_description.value_key == "latest_available_version":
update_info = self.state_data.get("update_info")
if isinstance(update_info, dict):
return {
"current": update_info.get("current"),
"tag": update_info.get("tag"),
"update_available": update_info.get("update_available"),
"download_url": update_info.get("download_url"),
}
return None
if self.entity_description.value_key not in ("skip_object_count", "skipped_object_count"):
return None
skip_state = self.state_data.get("skip_state")
if not isinstance(skip_state, dict):
return None
objects = skip_state.get("objects")
skipped = skip_state.get("skipped")
return {
"objects": objects if isinstance(objects, list) else [],
"skipped": skipped if isinstance(skipped, list) else [],
"filename": skip_state.get("filename"),
"ts": skip_state.get("ts"),
}
class KobraXAceDryerSensor(KobraXEntity, SensorEntity):
"""Per-ACE-unit dryer sensor."""
def __init__(
self,
coordinator,
entry,
ace_id: int,
sensor_type: str, # "status", "humidity", "current_temp", "target_temp", "remaining_time"
) -> None:
if sensor_type == "status":
unique_key = f"ace_{ace_id}_dryer_status"
name = f"ACE {ace_id + 1} Dryer Status"
icon = "mdi:tumble-dryer"
elif sensor_type == "humidity":
unique_key = f"ace_{ace_id}_dryer_humidity"
name = f"ACE {ace_id + 1} Dryer Humidity"
icon = "mdi:water-percent"
elif sensor_type == "current_temp":
unique_key = f"ace_{ace_id}_dryer_current_temp"
name = f"ACE {ace_id + 1} Dryer Current Temperature"
icon = None
elif sensor_type == "target_temp":
unique_key = f"ace_{ace_id}_dryer_target_temp"
name = f"ACE {ace_id + 1} Dryer Target Temperature"
icon = None
else: # remaining_time
unique_key = f"ace_{ace_id}_dryer_remaining_time"
name = f"ACE {ace_id + 1} Dryer Remaining Time"
icon = "mdi:timer-sand"
super().__init__(coordinator, entry, unique_key, name)
self._ace_id = ace_id
self._sensor_type = sensor_type
if icon:
self._attr_icon = icon
if sensor_type == "humidity":
self._attr_native_unit_of_measurement = PERCENTAGE
elif sensor_type in ("current_temp", "target_temp"):
self._attr_device_class = SensorDeviceClass.TEMPERATURE
self._attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
self._attr_suggested_unit_of_measurement = UnitOfTemperature.CELSIUS
elif sensor_type == "remaining_time":
pass # HH:mm:ss format, no unit needed
@staticmethod
def _minutes_to_hhmmss(minutes: int) -> str:
"""Convert minutes to HH:mm:ss format."""
total_seconds = minutes * 60
hours = total_seconds // 3600
remaining_seconds = total_seconds % 3600
mins = remaining_seconds // 60
secs = remaining_seconds % 60
return f"{hours:02d}:{mins:02d}:{secs:02d}"
@property
def native_value(self) -> Any:
drying = self.state_data.get("ace_drying") or {}
# Handle per-unit structure: ace_drying[ace_id] or global structure
if isinstance(drying, dict):
# Try per-unit key first
unit_data = drying.get(self._ace_id)
if isinstance(unit_data, dict):
drying = unit_data
elif self._ace_id == 0 and not unit_data:
# Fall back to global structure for unit 0
pass
else:
# No data for this unit
return None
else:
return None
if self._sensor_type == "status":
status = int(drying.get("status", 0)) if drying.get("status") is not None else 0
return "running" if status else "idle"
elif self._sensor_type == "humidity":
humidity = drying.get("humidity")
return round(float(humidity), 1) if humidity is not None else None
elif self._sensor_type == "current_temp":
current_temp = drying.get("current_temp")
return round(float(current_temp), 1) if current_temp is not None else None
elif self._sensor_type == "target_temp":
target_temp = drying.get("target_temp")
return float(target_temp) if target_temp is not None else None
elif self._sensor_type == "remaining_time":
remain = drying.get("remain_time")
if remain is not None:
try:
minutes = int(remain)
return self._minutes_to_hhmmss(minutes)
except (TypeError, ValueError):
return None
return None
return None
class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
def __init__(self, coordinator, entry, slot_index: int, field: str) -> None:
name_suffix = {
"color": f"Filament Slot {slot_index + 1} Color",
"type": f"Filament Slot {slot_index + 1} Type",
}[field]
super().__init__(coordinator, entry, f"filament_slot_{slot_index + 1}_{field}", name_suffix)
def __init__(self, coordinator, entry, slot_index: int) -> None:
super().__init__(coordinator, entry, f"slot_{slot_index + 1}", f"Slot {slot_index + 1}")
self._slot_index = slot_index
self._field = field
if field == "color":
self._attr_icon = "mdi:palette"
elif field == "type":
self._attr_icon = "mdi:label"
else:
self._attr_icon = "mdi:numeric"
self._attr_icon = "mdi:circle"
def _slot(self) -> dict[str, Any]:
slots = self.state_data.get("ams_slots") or []
@@ -139,6 +358,9 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
slot = slots[self._slot_index]
return slot if isinstance(slot, dict) else {}
def _slot_limit_for_mode(self) -> int:
return _detected_slot_limit(self.state_data)
@staticmethod
def _to_color_hex(color: Any) -> str | None:
if isinstance(color, list) and len(color) >= 3:
@@ -154,30 +376,132 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
@property
def available(self) -> bool:
return bool(self._slot()) and super().available
return self._slot_index < self._slot_limit_for_mode() and bool(self._slot()) and super().available
@property
def native_value(self) -> Any:
slot = self._slot()
if not slot:
return "EMPTY"
status = slot.get("status")
if status is not None:
try:
if int(status) != 5:
return "EMPTY"
except (TypeError, ValueError):
return "EMPTY"
material = slot.get("type")
material_str = str(material).upper() if material else "EMPTY"
color_hex = self._to_color_hex(slot.get("color"))
if color_hex and material_str != "EMPTY":
return f"{material_str} ({color_hex})"
return material_str
@property
def icon_color(self) -> str | None:
slot = self._slot()
if not slot:
return None
if self._field == "color":
return self._to_color_hex(slot.get("color"))
if self._field == "type":
material = slot.get("type")
return str(material).upper() if material else None
return None
return self._to_color_hex(slot.get("color"))
@property
def entity_picture(self) -> str | None:
"""Return a colored circle picture as a frontend fallback when icon tinting is ignored."""
slot = self._slot()
empty_svg = (
"<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 64 64'>"
"<circle cx='32' cy='32' r='28' fill='none' stroke='#666' stroke-width='4'/></svg>"
)
if not slot:
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
status = slot.get("status")
if status is not None:
try:
if int(status) != 5:
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
except (TypeError, ValueError):
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
color_hex = self._to_color_hex(slot.get("color"))
if not color_hex:
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
svg = (
"<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 64 64'>"
"<circle cx='32' cy='32' r='28' fill='"
f"{color_hex}"
"' stroke='#222' stroke-width='4'/></svg>"
)
return f"data:image/svg+xml;utf8,{quote(svg)}"
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
slot = self._slot()
if not slot:
return None
return {
"color_hex": self._to_color_hex(slot.get("color")),
"status": slot.get("status"),
"box_id": slot.get("box_id"),
"global_index": slot.get("global_index"),
"activity": slot.get("activity"),
}
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
slots = coordinator.data.get("ams_slots") if coordinator.data else []
slot_count = len(slots) if isinstance(slots, list) and len(slots) > 0 else 4
filament_entities: list[SensorEntity] = []
for slot_index in range(slot_count):
for field in ("color", "type"):
filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index, field))
for slot_index in range(MAX_FILAMENT_SLOTS):
filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index))
ace_dryer_entities: list[SensorEntity] = []
for ace_id in range(MAX_ACE_UNITS):
for sensor_type in ("status", "humidity", "current_temp", "target_temp", "remaining_time"):
ace_dryer_entities.append(KobraXAceDryerSensor(coordinator, entry, ace_id, sensor_type))
entity_registry = er.async_get(hass)
# Remove legacy single-unit ACE dryer sensors that were replaced by per-unit entities.
legacy_ace_sensor_keys = (
"ace_dryer_status",
"ace_dryer_humidity",
"ace_dryer_current_temp",
"ace_dryer_target_temp",
"ace_dryer_remaining_time",
)
for key in legacy_ace_sensor_keys:
legacy_unique_id = f"{entry.entry_id}_{key}"
legacy_entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, legacy_unique_id)
if legacy_entity_id:
entity_registry.async_remove(legacy_entity_id)
@callback
def _sync_slot_registry_state() -> None:
state_data = coordinator.data or {}
enabled_slots = _detected_slot_limit(state_data)
for slot_index in range(MAX_FILAMENT_SLOTS):
unique_id = f"{entry.entry_id}_slot_{slot_index + 1}"
entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, unique_id)
if not entity_id:
continue
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
should_enable = slot_index < enabled_slots
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
async_add_entities(
[
@@ -185,4 +509,68 @@ async def async_setup_entry(hass, entry, async_add_entities):
for description in SENSORS
]
+ filament_entities
+ ace_dryer_entities
)
@callback
def _sync_ace_registry_state() -> None:
state_data = coordinator.data or {}
enabled_ace_units = _detected_ace_unit_count(state_data)
# Define all ACE entity patterns: (platform, unique_id_pattern_parts)
ace_entities: list[tuple[str, str]] = []
for ace_index in range(MAX_ACE_UNITS):
# Switch entities
ace_entities.append(("switch", f"{entry.entry_id}_ace_{ace_index}_auto_feed"))
ace_entities.append(("switch", f"{entry.entry_id}_ace_{ace_index}_dryer"))
# Number entities (temp + duration)
ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_target_temp"))
ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_duration"))
# Button entities (start + stop)
ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_start"))
ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_stop"))
# Sensor entities (status, humidity, current_temp, target_temp, remaining_time)
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_status"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_humidity"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_current_temp"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_target_temp"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_remaining_time"))
for platform, unique_id in ace_entities:
entity_id = entity_registry.async_get_entity_id(platform, DOMAIN, unique_id)
if not entity_id:
continue
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
# Extract ace_index from unique_id
parts = unique_id.split("_")
if len(parts) >= 3:
try:
ace_index = int(parts[2])
except (ValueError, IndexError):
continue
else:
continue
should_enable = ace_index < enabled_ace_units
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
entry.async_on_unload(coordinator.async_add_listener(_sync_slot_registry_state))
entry.async_on_unload(coordinator.async_add_listener(_sync_ace_registry_state))
_sync_slot_registry_state()
_sync_ace_registry_state()

View File

@@ -0,0 +1,154 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.switch import SwitchEntity
from homeassistant.exceptions import ServiceValidationError
from .api import KobraXApiError
from .const import DOMAIN
from .entity import KobraXEntity
class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity):
def __init__(self, coordinator, entry, ace_id: int) -> None:
super().__init__(coordinator, entry, f"ace_{ace_id}_auto_feed", f"ACE {ace_id + 1} Auto Fill")
self._ace_id = ace_id
self._attr_icon = "mdi:autorenew"
@property
def is_on(self) -> bool:
auto_feed = self.state_data.get("ace_auto_feed") or {}
if not isinstance(auto_feed, dict):
return False
value = auto_feed.get(self._ace_id)
if value is None:
value = auto_feed.get(str(self._ace_id))
return bool(value)
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
auto_feed = merged.get("ace_auto_feed")
if not isinstance(auto_feed, dict):
auto_feed = {}
else:
auto_feed = dict(auto_feed)
key: int | str = self._ace_id
if key not in auto_feed and str(self._ace_id) in auto_feed:
key = str(self._ace_id)
auto_feed[key] = 1 if is_on else 0
merged["ace_auto_feed"] = auto_feed
self.coordinator.async_set_updated_data(merged)
async def async_turn_on(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, True)
self._apply_optimistic_state(True)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_turn_off(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, False)
self._apply_optimistic_state(False)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
def __init__(self, coordinator, entry, ace_id: int) -> None:
super().__init__(coordinator, entry, f"ace_{ace_id}_dryer", f"ACE {ace_id + 1} Dryer")
self._ace_id = ace_id
self._attr_icon = "mdi:tumble-dryer"
@property
def is_on(self) -> bool:
drying = self.state_data.get("ace_drying") or {}
if not isinstance(drying, dict):
return False
unit_data = drying.get(self._ace_id)
if unit_data is None:
unit_data = drying.get(str(self._ace_id))
if isinstance(unit_data, dict):
status = unit_data.get("status")
elif self._ace_id == 0:
# Backward-compatible fallback: older bridge payloads may expose unit 0 as a flat object.
status = drying.get("status")
else:
status = None
try:
return int(status) > 0 if status is not None else False
except (TypeError, ValueError):
return False
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
drying = merged.get("ace_drying")
if not isinstance(drying, dict):
drying = {}
else:
drying = dict(drying)
unit_data = drying.get(self._ace_id)
unit_key: int | str = self._ace_id
if unit_data is None:
unit_data = drying.get(str(self._ace_id))
if unit_data is not None:
unit_key = str(self._ace_id)
if isinstance(unit_data, dict):
next_unit_data = dict(unit_data)
else:
next_unit_data = {}
next_unit_data["status"] = 1 if is_on else 0
drying[unit_key] = next_unit_data
# Keep backward-compatible flat status for unit 0 payload variants.
if self._ace_id == 0:
drying["status"] = 1 if is_on else 0
merged["ace_drying"] = drying
self.coordinator.async_set_updated_data(merged)
async def async_turn_on(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
ace_cfg = cfg.get(self._ace_id) or {}
await api.async_set_ace_dry(
"start",
target_temp=int(ace_cfg.get("target_temp", 45)),
duration=int(ace_cfg.get("duration", 240)),
ace_id=self._ace_id,
)
self._apply_optimistic_state(True)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_turn_off(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_dry("stop", ace_id=self._ace_id)
self._apply_optimistic_state(False)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
entities = []
# Pre-create switches for all 4 possible ACE units
entities.extend(KobraXAceAutoFeedSwitch(coordinator, entry, ace_id) for ace_id in range(4))
entities.extend(KobraXAceDryerSwitch(coordinator, entry, ace_id) for ace_id in range(4))
async_add_entities(entities)

View File

@@ -4,7 +4,7 @@
"domains": [
"kobrax_lan"
],
"homeassistant": "2024.6.0",
"homeassistant": "2026.3.0",
"iot_class": "local_polling",
"render_readme": true
}