diff --git a/README.md b/README.md index fff24f0..36e0118 100644 --- a/README.md +++ b/README.md @@ -1,101 +1,61 @@ -# Kobra X Home Assistant Component +# Kobra X LAN for Home Assistant -Home Assistant HACS integration for controlling and monitoring an Anycubic Kobra X through KX-Bridge. +Home Assistant integration for monitoring and controlling an Anycubic Kobra X through KX-Bridge. This project was coded with AI assistance and should be reviewed before use in production. Architecture: -- printer <-> [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release) <-> this integration <-> Home Assistant +- printer <-> [KX-Bridge](https://gitea.it-drui.de/viewit/KX-Bridge-Release) <-> this integration <-> Home Assistant -## Features +## Requirements -- Auto-discovered status from KX-Bridge `/api/state` -- Core printer sensors (state, temperatures, progress, file, layer/time data) -- Light control -- Print speed mode selection -- Printer action buttons (pause, resume, cancel, connect, disconnect) -- Camera stream entity using the printer RTSP URL from KX-Bridge, with bridge MJPEG proxy fallback -- Camera snapshot fallback using `/api/camera/snapshot` -- G-code thumbnail image entity from the active print job +- Running and reachable [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release) +- Bridge endpoint accessible from Home Assistant at `http://:7125` -## Available Entities +## Installation -### Binary Sensors +### Option 1: HACS -- `Online` -- `Printing` -- `Light State` - -### Sensors - -- `State` -- `Print State` -- `Progress` -- `Hotend Temperature` -- `Target Hotend Temperature` -- `Bed Temperature` -- `Target Bed Temperature` -- `Filename` -- `Current Layer` -- `Total Layers` -- `Remaining Time` -- `Print Duration` -- `Filament Slot 1 Color` / `Filament Slot 1 Type` -- `Filament Slot 2 Color` / `Filament Slot 2 Type` -- `Filament Slot 3 Color` / `Filament Slot 3 Type` -- `Filament Slot 4 Color` / `Filament Slot 4 Type` - -The filament slot entities are created from the AMS slot data reported by KX-Bridge. If the bridge does not report a slot count, the integration falls back to 4 slots. - -### Buttons - -- `Pause Print` -- `Resume Print` -- `Cancel Print` -- `Connect Bridge` -- `Disconnect Bridge` - -### Select - -- `Print Speed` - -### Light - -- `Light` - -### Camera - -- `Camera` - -### Image - -- `GCode Thumbnail` - -## Prerequisites - -1. [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release) must be running and reachable from Home Assistant. -2. Verify [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release) is accessible at `http://:7125`. - -## Installation (HACS) - -1. Add this repository as a custom repository in HACS with category `Integration`. - - ```https://github.com/gangoke/kobrax-lan-hass-component``` - -2. Install the integration. +1. In HACS, add this repository as a custom repository (category: Integration): + `https://github.com/gangoke/kobrax-lan-hass-component` +2. Install Kobra X LAN from HACS. 3. Restart Home Assistant. -4. Add integration `Kobra X LAN` from Settings -> Devices & Services. +4. Go to Settings -> Devices & Services -> Add Integration. +5. Search for Kobra X LAN. + +### Option 2: Manual (local custom_components) + +1. Copy the `kobrax_lan` folder into your Home Assistant `custom_components` directory: + `/custom_components/kobrax_lan` +3. Restart Home Assistant. +4. Add Kobra X LAN from Settings -> Devices & Services. ## Configuration The config flow asks for: - Host: KX-Bridge host and port (example: `192.168.1.50:7125`) -- Printer name: Friendly display name +- Printer name: Friendly display name in Home Assistant + +## Entity Overview + +| Platform | Key Entities | +| --- | --- | +| Binary Sensor | Online, Printing, Light State | +| Sensor | State, Print State, Progress, Temperatures, Filename, Layer/Time metrics, Skip-object counts, ACE status, AMS/ACE slot material | +| Button | Pause Print, Resume Print, Cancel Print, Connect Bridge, Disconnect Bridge, Refresh Skip State, ACE Dryer Start, ACE Dryer Stop | +| Switch | ACE auto-fill switches per detected ACE unit | +| Number | ACE dry target temperature, ACE dry duration | +| Select | Print speed | +| Light | Printer light | +| Camera | Printer camera | +| Image | G-code thumbnail | + +Slot entities are generated dynamically from KX-Bridge AMS/ACE slot data (including larger ACE topologies). ## Notes -- This integration talks to KX-Bridge HTTP endpoints and does not connect directly to the printer. -- Keep KX-Bridge and Home Assistant on the same trusted network. -- Native WebRTC is not implemented in this integration. If you want WebRTC in Home Assistant, point `go2rtc` or a WebRTC-capable HA add-on at the camera entity's RTSP source. +- This integration communicates with KX-Bridge HTTP endpoints and does not connect directly to the printer. +- Keep KX-Bridge and Home Assistant on a trusted local network. +- Native WebRTC is not implemented. For WebRTC in Home Assistant, point `go2rtc` (or another WebRTC-capable add-on) to the camera RTSP source. diff --git a/custom_components/kobrax_lan/__init__.py b/custom_components/kobrax_lan/__init__.py index 0bc3a52..5e35448 100644 --- a/custom_components/kobrax_lan/__init__.py +++ b/custom_components/kobrax_lan/__init__.py @@ -34,6 +34,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: "coordinator": coordinator, "api": api, "entry": entry, + "ace_dry_config": { + 0: { + "target_temp": int((coordinator.data or {}).get("ace_drying", {}).get("target_temp", 45) or 45), + "duration": int((coordinator.data or {}).get("ace_drying", {}).get("duration", 240) or 240), + }, + 1: { + "target_temp": 45, + "duration": 240, + }, + 2: { + "target_temp": 45, + "duration": 240, + }, + 3: { + "target_temp": 45, + "duration": 240, + }, + }, } await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) diff --git a/custom_components/kobrax_lan/api.py b/custom_components/kobrax_lan/api.py index 34c007b..ffcf6e1 100644 --- a/custom_components/kobrax_lan/api.py +++ b/custom_components/kobrax_lan/api.py @@ -49,6 +49,62 @@ class KobraXApiClient: return result raise KobraXApiError("Unexpected response for /kx/files") + async def async_get_file_objects(self, file_id: str) -> dict[str, Any]: + data = await self._get_json(f"/kx/files/{file_id}/objects") + result = data.get("result", {}) + if isinstance(result, dict): + return result + raise KobraXApiError("Unexpected response for /kx/files/{id}/objects") + + async def async_skip_objects(self, names: list[str]) -> dict[str, Any]: + data = await self._post_json("/kx/skip", {"names": names}) + result = data.get("result") + if result is None: + raise KobraXApiError("Unexpected response for /kx/skip") + return data + + async def async_skip_query(self) -> dict[str, Any]: + data = await self._post_json("/kx/skip/query", {}) + result = data.get("result", {}) + if isinstance(result, dict): + return result + raise KobraXApiError("Unexpected response for /kx/skip/query") + + async def async_get_skip_state(self) -> dict[str, Any]: + data = await self._get_json("/kx/skip/state") + result = data.get("result", {}) + if isinstance(result, dict): + return result + raise KobraXApiError("Unexpected response for /kx/skip/state") + + async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]: + data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on}) + result = data.get("result") + if result is None: + raise KobraXApiError("Unexpected response for /api/ace/auto_feed") + return data + + async def async_set_ace_dry( + self, + action: str, + target_temp: int | None = None, + duration: int | None = None, + ace_id: int | None = None, + ) -> dict[str, Any]: + payload: dict[str, Any] = {"action": action} + if target_temp is not None: + payload["target_temp"] = int(target_temp) + if duration is not None: + payload["duration"] = int(duration) + if ace_id is not None: + payload["ace_id"] = int(ace_id) + + data = await self._post_json("/api/ace/dry", payload) + result = data.get("result") + if result is None: + raise KobraXApiError("Unexpected response for /api/ace/dry") + return data + async def async_pause_print(self) -> None: await self._post_json("/printer/print/pause", {}) @@ -79,9 +135,24 @@ class KobraXApiClient: async def async_disconnect(self) -> None: await self._post_json("/api/disconnect", {}) + async def async_restart_bridge(self) -> None: + await self._post_json("/api/restart", {}) + async def async_start_camera(self) -> None: await self._post_json("/api/camera/start", {}) + async def async_stop_camera(self) -> None: + await self._post_json("/api/camera/stop", {}) + + async def async_check_updates(self) -> dict[str, Any]: + return await self._get_json("/api/update/check") + + async def async_apply_update(self, tag: str, download_url: str) -> dict[str, Any]: + return await self._post_json( + "/api/update/apply", + {"tag": tag, "download_url": download_url}, + ) + async def async_get_camera_url(self) -> str | None: data = await self._get_json("/api/camera") url = data.get("url") diff --git a/custom_components/kobrax_lan/binary_sensor.py b/custom_components/kobrax_lan/binary_sensor.py index 04e133f..7124a8e 100644 --- a/custom_components/kobrax_lan/binary_sensor.py +++ b/custom_components/kobrax_lan/binary_sensor.py @@ -7,6 +7,7 @@ from homeassistant.components.binary_sensor import ( BinarySensorEntity, BinarySensorEntityDescription, ) +from homeassistant.helpers.entity import EntityCategory from .const import DOMAIN from .entity import KobraXEntity @@ -23,6 +24,7 @@ BINARY_SENSORS: tuple[KobraXBinaryDescription, ...] = ( name="Online", value_key="kobra_state", device_class=BinarySensorDeviceClass.CONNECTIVITY, + entity_category=EntityCategory.DIAGNOSTIC, ), KobraXBinaryDescription( key="printing", diff --git a/custom_components/kobrax_lan/brand/dark_icon.png b/custom_components/kobrax_lan/brand/dark_icon.png new file mode 100644 index 0000000..0c16a9a Binary files /dev/null and b/custom_components/kobrax_lan/brand/dark_icon.png differ diff --git a/custom_components/kobrax_lan/brand/dark_logo.png b/custom_components/kobrax_lan/brand/dark_logo.png new file mode 100644 index 0000000..0c16a9a Binary files /dev/null and b/custom_components/kobrax_lan/brand/dark_logo.png differ diff --git a/custom_components/kobrax_lan/brand/icon.png b/custom_components/kobrax_lan/brand/icon.png new file mode 100644 index 0000000..0c16a9a Binary files /dev/null and b/custom_components/kobrax_lan/brand/icon.png differ diff --git a/custom_components/kobrax_lan/brand/logo.png b/custom_components/kobrax_lan/brand/logo.png new file mode 100644 index 0000000..0c16a9a Binary files /dev/null and b/custom_components/kobrax_lan/brand/logo.png differ diff --git a/custom_components/kobrax_lan/button.py b/custom_components/kobrax_lan/button.py index ac7b7a2..c2481bf 100644 --- a/custom_components/kobrax_lan/button.py +++ b/custom_components/kobrax_lan/button.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from homeassistant.components.button import ButtonEntity, ButtonEntityDescription from homeassistant.exceptions import ServiceValidationError +from homeassistant.helpers.entity import EntityCategory from .api import KobraXApiError from .const import DOMAIN @@ -13,6 +14,7 @@ from .entity import KobraXEntity @dataclass(frozen=True, kw_only=True) class KobraXButtonDescription(ButtonEntityDescription): action: str + ace_id: int | None = None # None for non-ACE buttons, 0-3 for ACE buttons BUTTONS: tuple[KobraXButtonDescription, ...] = ( @@ -39,6 +41,7 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = ( name="Connect Bridge", icon="mdi:lan-connect", action="connect", + entity_category=EntityCategory.CONFIG, entity_registry_enabled_default=False, ), KobraXButtonDescription( @@ -46,11 +49,37 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = ( name="Disconnect Bridge", icon="mdi:lan-disconnect", action="disconnect", + entity_category=EntityCategory.CONFIG, entity_registry_enabled_default=False, ), +# ENABLE ME WHEN API ENDPOINT IS ADDED +# KobraXButtonDescription( +# key="restart_bridge", +# name="Restart (KX-Bridge)", +# icon="mdi:restart", +# action="restart", +# entity_category=EntityCategory.CONFIG, +# ), + KobraXButtonDescription( + key="refresh_skip_state", + name="Refresh Skip State", + icon="mdi:refresh", + action="skip_query", + entity_registry_enabled_default=False, + ), + KobraXButtonDescription( + key="apply_update", + name="Apply Update (KX-Bridge)", + icon="mdi:download-circle-outline", + action="apply_update", + entity_category=EntityCategory.CONFIG, + ), ) +# ACE dryer buttons are now dynamically created in async_setup_entry + + class KobraXActionButton(KobraXEntity, ButtonEntity): entity_description: KobraXButtonDescription @@ -58,6 +87,25 @@ class KobraXActionButton(KobraXEntity, ButtonEntity): super().__init__(coordinator, entry, description.key, description.name) self.entity_description = description + @property + def available(self) -> bool: + if self.entity_description.action != "apply_update": + return super().available + + update_info = self.state_data.get("update_info") + if not isinstance(update_info, dict): + return False + + current = str(update_info.get("current") or "").strip() + latest = str(update_info.get("latest") or "").strip() + if current and latest and current == latest: + return False + + if update_info.get("update_available") is False: + return False + + return super().available + async def async_press(self) -> None: api = self.hass.data[DOMAIN][self._entry.entry_id]["api"] try: @@ -71,6 +119,55 @@ class KobraXActionButton(KobraXEntity, ButtonEntity): await api.async_connect() elif self.entity_description.action == "disconnect": await api.async_disconnect() + elif self.entity_description.action == "restart": + await api.async_restart_bridge() + elif self.entity_description.action == "skip_query": + await api.async_skip_query() + elif self.entity_description.action == "apply_update": + await self.coordinator.async_apply_update() + await self.coordinator.async_request_refresh() + except KobraXApiError as err: + raise ServiceValidationError(str(err)) from err + except Exception as err: + raise ServiceValidationError(str(err)) from err + + +class KobraXAceDryButton(KobraXEntity, ButtonEntity): + def __init__( + self, + coordinator, + entry, + ace_id: int, + action: str, # "dry_start" or "dry_stop" + ) -> None: + if action == "dry_start": + unique_key = f"ace_{ace_id}_dry_start" + name = f"ACE {ace_id + 1} Dryer Start" + icon = "mdi:tumble-dryer" + else: # dry_stop + unique_key = f"ace_{ace_id}_dry_stop" + name = f"ACE {ace_id + 1} Dryer Stop" + icon = "mdi:tumble-dryer-off" + + super().__init__(coordinator, entry, unique_key, name) + self._ace_id = ace_id + self._action = action + self._attr_icon = icon + + async def async_press(self) -> None: + api = self.hass.data[DOMAIN][self._entry.entry_id]["api"] + try: + if self._action == "dry_start": + cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"] + ace_cfg = cfg.get(self._ace_id) or {} + await api.async_set_ace_dry( + "start", + target_temp=int(ace_cfg.get("target_temp", 45)), + duration=int(ace_cfg.get("duration", 240)), + ace_id=self._ace_id, + ) + elif self._action == "dry_stop": + await api.async_set_ace_dry("stop", ace_id=self._ace_id) await self.coordinator.async_request_refresh() except KobraXApiError as err: raise ServiceValidationError(str(err)) from err @@ -78,9 +175,15 @@ class KobraXActionButton(KobraXEntity, ButtonEntity): async def async_setup_entry(hass, entry, async_add_entities): coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"] - async_add_entities( - [ - KobraXActionButton(coordinator, entry, description) - for description in BUTTONS - ] - ) + + entities = [ + KobraXActionButton(coordinator, entry, description) + for description in BUTTONS + ] + + # Pre-create all 8 ACE dryer buttons (start + stop for each of 4 ACE units) + for ace_id in range(4): + entities.append(KobraXAceDryButton(coordinator, entry, ace_id, "dry_start")) + entities.append(KobraXAceDryButton(coordinator, entry, ace_id, "dry_stop")) + + async_add_entities(entities) diff --git a/custom_components/kobrax_lan/const.py b/custom_components/kobrax_lan/const.py index bc9f441..2f6af12 100644 --- a/custom_components/kobrax_lan/const.py +++ b/custom_components/kobrax_lan/const.py @@ -11,6 +11,7 @@ DEFAULT_HOST = "localhost:7125" DEFAULT_PRINTER_NAME = "Anycubic Kobra X" UPDATE_INTERVAL = timedelta(seconds=5) +UPDATE_CHECK_INTERVAL = timedelta(hours=1) PLATFORMS = [ "sensor", @@ -18,6 +19,8 @@ PLATFORMS = [ "light", "select", "button", + "switch", + "number", "camera", "image", ] diff --git a/custom_components/kobrax_lan/coordinator.py b/custom_components/kobrax_lan/coordinator.py index e290299..7e8ca16 100644 --- a/custom_components/kobrax_lan/coordinator.py +++ b/custom_components/kobrax_lan/coordinator.py @@ -1,12 +1,13 @@ from __future__ import annotations +import time from typing import Any from homeassistant.core import HomeAssistant from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .api import KobraXApiClient, KobraXApiError -from .const import DOMAIN, UPDATE_INTERVAL +from .const import DOMAIN, UPDATE_CHECK_INTERVAL, UPDATE_INTERVAL class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]): @@ -18,9 +19,73 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]): update_interval=UPDATE_INTERVAL, ) self.api = api + self._update_info: dict[str, Any] = {} + self._restart_supported: bool | None = None + self._next_update_check_monotonic = 0.0 async def _async_update_data(self) -> dict[str, Any]: try: - return await self.api.async_get_state() + # Probe restart endpoint once if not checked + if self._restart_supported is None: + try: + await self.api.async_restart_bridge() + self._restart_supported = True + except Exception as err: + # Only disable if 404/501, otherwise treat as available + msg = str(err) + if "404" in msg or "501" in msg: + self._restart_supported = False + else: + self._restart_supported = True + state = await self.api.async_get_state() + + now = time.monotonic() + if now >= self._next_update_check_monotonic: + try: + self._update_info = await self.api.async_check_updates() + except KobraXApiError: + # Keep integration polling resilient if update service is temporarily unavailable. + pass + self._next_update_check_monotonic = now + UPDATE_CHECK_INTERVAL.total_seconds() + + if self._update_info: + state["update_info"] = self._update_info + try: + skip_state = await self.api.async_get_skip_state() + state["skip_state"] = skip_state + except KobraXApiError: + # Skip endpoints are only available on newer bridge versions. + pass + return state except KobraXApiError as err: raise UpdateFailed(str(err)) from err + + async def async_check_updates(self) -> dict[str, Any]: + try: + update_info = await self.api.async_check_updates() + except KobraXApiError as err: + raise UpdateFailed(str(err)) from err + + self._update_info = update_info + merged = dict(self.data or {}) + merged["update_info"] = update_info + self.async_set_updated_data(merged) + return update_info + + async def async_apply_update(self) -> dict[str, Any]: + update_info = self._update_info or await self.async_check_updates() + tag = str(update_info.get("tag") or "").strip() + download_url = str(update_info.get("download_url") or "").strip() + if not tag or not download_url: + raise UpdateFailed("Missing tag or download URL from update check") + + try: + result = await self.api.async_apply_update(tag=tag, download_url=download_url) + except KobraXApiError as err: + raise UpdateFailed(str(err)) from err + + self._update_info = {**update_info, "last_apply_result": result} + merged = dict(self.data or {}) + merged["update_info"] = self._update_info + self.async_set_updated_data(merged) + return result diff --git a/custom_components/kobrax_lan/manifest.json b/custom_components/kobrax_lan/manifest.json index 565d523..2f98f74 100644 --- a/custom_components/kobrax_lan/manifest.json +++ b/custom_components/kobrax_lan/manifest.json @@ -5,8 +5,9 @@ "@Gangoke" ], "config_flow": true, + "homeassistant": "2026.3.0", "documentation": "https://github.com/gangoke/kobrax-lan-hass-component", "iot_class": "local_polling", "issue_tracker": "https://github.com/gangoke/kobrax-lan-hass-component/issues", - "version": "0.1.0" + "version": "0.2.0" } diff --git a/custom_components/kobrax_lan/number.py b/custom_components/kobrax_lan/number.py new file mode 100644 index 0000000..b0522c7 --- /dev/null +++ b/custom_components/kobrax_lan/number.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from typing import Any + +from homeassistant.components.number import NumberEntity + +from .const import DOMAIN +from .entity import KobraXEntity + + +def _minutes_to_hhmmss(minutes: int | float) -> str: + """Convert minutes to HH:mm:ss format.""" + total_seconds = int(minutes * 60) + hours = total_seconds // 3600 + remaining_seconds = total_seconds % 3600 + mins = remaining_seconds // 60 + secs = remaining_seconds % 60 + return f"{hours:02d}:{mins:02d}:{secs:02d}" + + +class KobraXAceDryConfigNumber(KobraXEntity, NumberEntity): + def __init__( + self, + coordinator, + entry, + ace_id: int, + config_type: str, # "target_temp" or "duration" + ) -> None: + if config_type == "target_temp": + unique_key = f"ace_{ace_id}_dry_target_temp" + name = f"ACE {ace_id + 1} Dryer Target Temperature" + min_val, max_val, step_val = 30, 80, 1 + unit = "°C" + icon = "mdi:thermometer" + else: # duration + unique_key = f"ace_{ace_id}_dry_duration" + name = f"ACE {ace_id + 1} Dryer Duration" + min_val, max_val, step_val = 10, 1440, 1 + unit = "min" + icon = "mdi:timer-cog-outline" + + super().__init__(coordinator, entry, unique_key, name) + self._ace_id = ace_id + self._config_type = config_type + self._attr_native_min_value = min_val + self._attr_native_max_value = max_val + self._attr_native_step = step_val + self._attr_native_unit_of_measurement = unit + self._attr_mode = "box" + self._attr_icon = icon + + @property + def native_value(self) -> float: + cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"] + ace_cfg = cfg.get(self._ace_id) or {} + key = "target_temp" if self._config_type == "target_temp" else "duration" + if self._config_type == "target_temp": + return float(ace_cfg.get(key, 45)) + else: + return float(ace_cfg.get(key, 240)) + + @property + def extra_state_attributes(self) -> dict[str, Any] | None: + if self._config_type != "duration": + return None + cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"] + ace_cfg = cfg.get(self._ace_id) or {} + duration_minutes = ace_cfg.get("duration", 240) + return {"formatted_duration": _minutes_to_hhmmss(duration_minutes)} + + async def async_set_native_value(self, value: float) -> None: + cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"] + if self._ace_id not in cfg: + cfg[self._ace_id] = {} + key = "target_temp" if self._config_type == "target_temp" else "duration" + cfg[self._ace_id][key] = int(round(value)) + self.async_write_ha_state() + + +async def async_setup_entry(hass, entry, async_add_entities): + coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"] + + # Initialize ace_dry_config structure if not present + if "ace_dry_config" not in hass.data[DOMAIN][entry.entry_id]: + hass.data[DOMAIN][entry.entry_id]["ace_dry_config"] = {} + + # Pre-create all 8 numbers (target_temp + duration for each of 4 ACE units) + entities = [] + for ace_id in range(4): + entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "target_temp")) + entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "duration")) + + async_add_entities(entities) diff --git a/custom_components/kobrax_lan/sensor.py b/custom_components/kobrax_lan/sensor.py index 7822db5..e620525 100644 --- a/custom_components/kobrax_lan/sensor.py +++ b/custom_components/kobrax_lan/sensor.py @@ -2,14 +2,45 @@ from __future__ import annotations from dataclasses import dataclass from typing import Any +from urllib.parse import quote from homeassistant.components.sensor import SensorDeviceClass, SensorEntity, SensorEntityDescription from homeassistant.const import PERCENTAGE, UnitOfTemperature +from homeassistant.core import callback +from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers import entity_registry as er +from homeassistant.helpers.entity_registry import RegistryEntryDisabler from .const import DOMAIN from .entity import KobraXEntity +MAX_FILAMENT_SLOTS = 19 +TOOLHEAD_SLOT_LIMIT = 4 +ACE_DIRECT_SLOT_LIMIT = 4 +MAX_ACE_UNITS = 4 + + +def _detected_slot_limit(state_data: dict[str, Any]) -> int: + mode = str(state_data.get("filament_mode") or "toolhead").lower() + if mode == "toolhead": + return TOOLHEAD_SLOT_LIMIT + if mode == "ace_direct": + return ACE_DIRECT_SLOT_LIMIT + + slots = state_data.get("ams_slots") or [] + if isinstance(slots, list) and slots: + return min(len(slots), MAX_FILAMENT_SLOTS) + return MAX_FILAMENT_SLOTS + + +def _detected_ace_unit_count(state_data: dict[str, Any]) -> int: + units = state_data.get("ace_units") or [] + if isinstance(units, list): + return min(len(units), MAX_ACE_UNITS) + return 0 + + @dataclass(frozen=True, kw_only=True) class KobraXSensorDescription(SensorEntityDescription): value_key: str @@ -97,6 +128,42 @@ SENSORS: tuple[KobraXSensorDescription, ...] = ( value_key="print_duration", icon="mdi:timer-outline", ), + KobraXSensorDescription( + key="skip_object_count", + name="Skip Object Count", + value_key="skip_object_count", + icon="mdi:vector-polygon", + ), + KobraXSensorDescription( + key="skipped_object_count", + name="Skipped Object Count", + value_key="skipped_object_count", + icon="mdi:content-cut", + ), + KobraXSensorDescription( + key="filament_mode", + name="Filament Mode", + value_key="filament_mode", + icon="mdi:shape-outline", + ), + KobraXSensorDescription( + key="ace_unit_count", + name="ACE Unit Count", + value_key="ace_unit_count", + icon="mdi:package-variant", + ), + KobraXSensorDescription( + key="bridge_version", + name="Bridge Version", + value_key="version", + icon="mdi:source-branch", + ), + KobraXSensorDescription( + key="latest_available_version", + name="Latest Available Version", + value_key="latest_available_version", + icon="mdi:cloud-download-outline", + ), ) @@ -106,31 +173,183 @@ class KobraXSensor(KobraXEntity, SensorEntity): def __init__(self, coordinator, entry, description: KobraXSensorDescription) -> None: super().__init__(coordinator, entry, description.key, description.name) self.entity_description = description + if description.value_key in ("version", "latest_available_version"): + self._attr_entity_category = EntityCategory.DIAGNOSTIC + + @staticmethod + def _seconds_to_hhmmss(seconds: int) -> str: + hours = seconds // 3600 + remaining_seconds = seconds % 3600 + minutes = remaining_seconds // 60 + secs = remaining_seconds % 60 + return f"{hours:02d}:{minutes:02d}:{secs:02d}" @property def native_value(self) -> Any: + if self.entity_description.value_key == "filament_mode": + return self.state_data.get("filament_mode") + if self.entity_description.value_key == "ace_unit_count": + units = self.state_data.get("ace_units") + return len(units) if isinstance(units, list) else 0 + if self.entity_description.value_key == "latest_available_version": + update_info = self.state_data.get("update_info") or {} + if isinstance(update_info, dict): + return update_info.get("latest") + return None + if self.entity_description.value_key == "skip_object_count": + skip_state = self.state_data.get("skip_state") or {} + objects = skip_state.get("objects") if isinstance(skip_state, dict) else [] + return len(objects) if isinstance(objects, list) else 0 + if self.entity_description.value_key == "skipped_object_count": + skip_state = self.state_data.get("skip_state") or {} + skipped = skip_state.get("skipped") if isinstance(skip_state, dict) else [] + return len(skipped) if isinstance(skipped, list) else 0 + value = self.state_data.get(self.entity_description.value_key) if self.entity_description.value_key == "progress" and value is not None: return round(float(value) * 100, 1) + if self.entity_description.value_key in ("remain_time", "print_duration") and value is not None: + try: + return self._seconds_to_hhmmss(int(float(value))) + except (TypeError, ValueError): + return None return value + @property + def extra_state_attributes(self) -> dict[str, Any] | None: + if self.entity_description.value_key == "latest_available_version": + update_info = self.state_data.get("update_info") + if isinstance(update_info, dict): + return { + "current": update_info.get("current"), + "tag": update_info.get("tag"), + "update_available": update_info.get("update_available"), + "download_url": update_info.get("download_url"), + } + return None + + if self.entity_description.value_key not in ("skip_object_count", "skipped_object_count"): + return None + + skip_state = self.state_data.get("skip_state") + if not isinstance(skip_state, dict): + return None + + objects = skip_state.get("objects") + skipped = skip_state.get("skipped") + return { + "objects": objects if isinstance(objects, list) else [], + "skipped": skipped if isinstance(skipped, list) else [], + "filename": skip_state.get("filename"), + "ts": skip_state.get("ts"), + } + + +class KobraXAceDryerSensor(KobraXEntity, SensorEntity): + """Per-ACE-unit dryer sensor.""" + + def __init__( + self, + coordinator, + entry, + ace_id: int, + sensor_type: str, # "status", "humidity", "current_temp", "target_temp", "remaining_time" + ) -> None: + if sensor_type == "status": + unique_key = f"ace_{ace_id}_dryer_status" + name = f"ACE {ace_id + 1} Dryer Status" + icon = "mdi:tumble-dryer" + elif sensor_type == "humidity": + unique_key = f"ace_{ace_id}_dryer_humidity" + name = f"ACE {ace_id + 1} Dryer Humidity" + icon = "mdi:water-percent" + elif sensor_type == "current_temp": + unique_key = f"ace_{ace_id}_dryer_current_temp" + name = f"ACE {ace_id + 1} Dryer Current Temperature" + icon = None + elif sensor_type == "target_temp": + unique_key = f"ace_{ace_id}_dryer_target_temp" + name = f"ACE {ace_id + 1} Dryer Target Temperature" + icon = None + else: # remaining_time + unique_key = f"ace_{ace_id}_dryer_remaining_time" + name = f"ACE {ace_id + 1} Dryer Remaining Time" + icon = "mdi:timer-sand" + + super().__init__(coordinator, entry, unique_key, name) + self._ace_id = ace_id + self._sensor_type = sensor_type + if icon: + self._attr_icon = icon + + if sensor_type == "humidity": + self._attr_native_unit_of_measurement = PERCENTAGE + elif sensor_type in ("current_temp", "target_temp"): + self._attr_device_class = SensorDeviceClass.TEMPERATURE + self._attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS + self._attr_suggested_unit_of_measurement = UnitOfTemperature.CELSIUS + elif sensor_type == "remaining_time": + pass # HH:mm:ss format, no unit needed + + @staticmethod + def _minutes_to_hhmmss(minutes: int) -> str: + """Convert minutes to HH:mm:ss format.""" + total_seconds = minutes * 60 + hours = total_seconds // 3600 + remaining_seconds = total_seconds % 3600 + mins = remaining_seconds // 60 + secs = remaining_seconds % 60 + return f"{hours:02d}:{mins:02d}:{secs:02d}" + + @property + def native_value(self) -> Any: + drying = self.state_data.get("ace_drying") or {} + + # Handle per-unit structure: ace_drying[ace_id] or global structure + if isinstance(drying, dict): + # Try per-unit key first + unit_data = drying.get(self._ace_id) + if isinstance(unit_data, dict): + drying = unit_data + elif self._ace_id == 0 and not unit_data: + # Fall back to global structure for unit 0 + pass + else: + # No data for this unit + return None + else: + return None + + if self._sensor_type == "status": + status = int(drying.get("status", 0)) if drying.get("status") is not None else 0 + return "running" if status else "idle" + elif self._sensor_type == "humidity": + humidity = drying.get("humidity") + return round(float(humidity), 1) if humidity is not None else None + elif self._sensor_type == "current_temp": + current_temp = drying.get("current_temp") + return round(float(current_temp), 1) if current_temp is not None else None + elif self._sensor_type == "target_temp": + target_temp = drying.get("target_temp") + return float(target_temp) if target_temp is not None else None + elif self._sensor_type == "remaining_time": + remain = drying.get("remain_time") + if remain is not None: + try: + minutes = int(remain) + return self._minutes_to_hhmmss(minutes) + except (TypeError, ValueError): + return None + return None + + return None + class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity): - def __init__(self, coordinator, entry, slot_index: int, field: str) -> None: - name_suffix = { - "color": f"Filament Slot {slot_index + 1} Color", - "type": f"Filament Slot {slot_index + 1} Type", - }[field] - super().__init__(coordinator, entry, f"filament_slot_{slot_index + 1}_{field}", name_suffix) + def __init__(self, coordinator, entry, slot_index: int) -> None: + super().__init__(coordinator, entry, f"slot_{slot_index + 1}", f"Slot {slot_index + 1}") self._slot_index = slot_index - self._field = field - - if field == "color": - self._attr_icon = "mdi:palette" - elif field == "type": - self._attr_icon = "mdi:label" - else: - self._attr_icon = "mdi:numeric" + self._attr_icon = "mdi:circle" def _slot(self) -> dict[str, Any]: slots = self.state_data.get("ams_slots") or [] @@ -139,6 +358,9 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity): slot = slots[self._slot_index] return slot if isinstance(slot, dict) else {} + def _slot_limit_for_mode(self) -> int: + return _detected_slot_limit(self.state_data) + @staticmethod def _to_color_hex(color: Any) -> str | None: if isinstance(color, list) and len(color) >= 3: @@ -154,30 +376,132 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity): @property def available(self) -> bool: - return bool(self._slot()) and super().available + return self._slot_index < self._slot_limit_for_mode() and bool(self._slot()) and super().available @property def native_value(self) -> Any: + slot = self._slot() + if not slot: + return "EMPTY" + status = slot.get("status") + if status is not None: + try: + if int(status) != 5: + return "EMPTY" + except (TypeError, ValueError): + return "EMPTY" + material = slot.get("type") + material_str = str(material).upper() if material else "EMPTY" + color_hex = self._to_color_hex(slot.get("color")) + if color_hex and material_str != "EMPTY": + return f"{material_str} ({color_hex})" + return material_str + + @property + def icon_color(self) -> str | None: slot = self._slot() if not slot: return None - if self._field == "color": - return self._to_color_hex(slot.get("color")) - if self._field == "type": - material = slot.get("type") - return str(material).upper() if material else None - return None + return self._to_color_hex(slot.get("color")) + + @property + def entity_picture(self) -> str | None: + """Return a colored circle picture as a frontend fallback when icon tinting is ignored.""" + slot = self._slot() + empty_svg = ( + "" + "" + ) + if not slot: + return f"data:image/svg+xml;utf8,{quote(empty_svg)}" + + status = slot.get("status") + if status is not None: + try: + if int(status) != 5: + return f"data:image/svg+xml;utf8,{quote(empty_svg)}" + except (TypeError, ValueError): + return f"data:image/svg+xml;utf8,{quote(empty_svg)}" + + color_hex = self._to_color_hex(slot.get("color")) + if not color_hex: + return f"data:image/svg+xml;utf8,{quote(empty_svg)}" + + svg = ( + "" + "" + ) + return f"data:image/svg+xml;utf8,{quote(svg)}" + + @property + def extra_state_attributes(self) -> dict[str, Any] | None: + slot = self._slot() + if not slot: + return None + return { + "color_hex": self._to_color_hex(slot.get("color")), + "status": slot.get("status"), + "box_id": slot.get("box_id"), + "global_index": slot.get("global_index"), + "activity": slot.get("activity"), + } async def async_setup_entry(hass, entry, async_add_entities): coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"] - slots = coordinator.data.get("ams_slots") if coordinator.data else [] - slot_count = len(slots) if isinstance(slots, list) and len(slots) > 0 else 4 filament_entities: list[SensorEntity] = [] - for slot_index in range(slot_count): - for field in ("color", "type"): - filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index, field)) + for slot_index in range(MAX_FILAMENT_SLOTS): + filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index)) + + ace_dryer_entities: list[SensorEntity] = [] + for ace_id in range(MAX_ACE_UNITS): + for sensor_type in ("status", "humidity", "current_temp", "target_temp", "remaining_time"): + ace_dryer_entities.append(KobraXAceDryerSensor(coordinator, entry, ace_id, sensor_type)) + + entity_registry = er.async_get(hass) + + # Remove legacy single-unit ACE dryer sensors that were replaced by per-unit entities. + legacy_ace_sensor_keys = ( + "ace_dryer_status", + "ace_dryer_humidity", + "ace_dryer_current_temp", + "ace_dryer_target_temp", + "ace_dryer_remaining_time", + ) + for key in legacy_ace_sensor_keys: + legacy_unique_id = f"{entry.entry_id}_{key}" + legacy_entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, legacy_unique_id) + if legacy_entity_id: + entity_registry.async_remove(legacy_entity_id) + + @callback + def _sync_slot_registry_state() -> None: + state_data = coordinator.data or {} + enabled_slots = _detected_slot_limit(state_data) + + for slot_index in range(MAX_FILAMENT_SLOTS): + unique_id = f"{entry.entry_id}_slot_{slot_index + 1}" + entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, unique_id) + if not entity_id: + continue + + reg_entry = entity_registry.async_get(entity_id) + if reg_entry is None: + continue + + should_enable = slot_index < enabled_slots + if should_enable: + if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION: + entity_registry.async_update_entity(entity_id, disabled_by=None) + else: + if reg_entry.disabled_by is None: + entity_registry.async_update_entity( + entity_id, + disabled_by=RegistryEntryDisabler.INTEGRATION, + ) async_add_entities( [ @@ -185,4 +509,67 @@ async def async_setup_entry(hass, entry, async_add_entities): for description in SENSORS ] + filament_entities + + ace_dryer_entities ) + + @callback + def _sync_ace_registry_state() -> None: + state_data = coordinator.data or {} + enabled_ace_units = _detected_ace_unit_count(state_data) + + # Define all ACE entity patterns: (platform, unique_id_pattern_parts) + ace_entities: list[tuple[str, str]] = [] + + for ace_index in range(MAX_ACE_UNITS): + # Switch entities + ace_entities.append(("switch", f"{entry.entry_id}_ace_{ace_index}_auto_feed")) + + # Number entities (temp + duration) + ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_target_temp")) + ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_duration")) + + # Button entities (start + stop) + ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_start")) + ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_stop")) + + # Sensor entities (status, humidity, current_temp, target_temp, remaining_time) + ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_status")) + ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_humidity")) + ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_current_temp")) + ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_target_temp")) + ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_remaining_time")) + + for platform, unique_id in ace_entities: + entity_id = entity_registry.async_get_entity_id(platform, DOMAIN, unique_id) + if not entity_id: + continue + + reg_entry = entity_registry.async_get(entity_id) + if reg_entry is None: + continue + + # Extract ace_index from unique_id + parts = unique_id.split("_") + if len(parts) >= 3: + try: + ace_index = int(parts[2]) + except (ValueError, IndexError): + continue + else: + continue + + should_enable = ace_index < enabled_ace_units + if should_enable: + if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION: + entity_registry.async_update_entity(entity_id, disabled_by=None) + else: + if reg_entry.disabled_by is None: + entity_registry.async_update_entity( + entity_id, + disabled_by=RegistryEntryDisabler.INTEGRATION, + ) + + entry.async_on_unload(coordinator.async_add_listener(_sync_slot_registry_state)) + entry.async_on_unload(coordinator.async_add_listener(_sync_ace_registry_state)) + _sync_slot_registry_state() + _sync_ace_registry_state() diff --git a/custom_components/kobrax_lan/switch.py b/custom_components/kobrax_lan/switch.py new file mode 100644 index 0000000..76db9ba --- /dev/null +++ b/custom_components/kobrax_lan/switch.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from homeassistant.components.switch import SwitchEntity +from homeassistant.exceptions import ServiceValidationError + +from .api import KobraXApiError +from .const import DOMAIN +from .entity import KobraXEntity + + +class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity): + def __init__(self, coordinator, entry, ace_id: int) -> None: + super().__init__(coordinator, entry, f"ace_{ace_id}_auto_feed", f"ACE {ace_id + 1} Auto Fill") + self._ace_id = ace_id + self._attr_icon = "mdi:autorenew" + + @property + def is_on(self) -> bool: + auto_feed = self.state_data.get("ace_auto_feed") or {} + if not isinstance(auto_feed, dict): + return False + value = auto_feed.get(self._ace_id) + if value is None: + value = auto_feed.get(str(self._ace_id)) + return bool(value) + + async def async_turn_on(self, **kwargs) -> None: + api = self.hass.data[DOMAIN][self._entry.entry_id]["api"] + try: + await api.async_set_ace_auto_feed(self._ace_id, True) + await self.coordinator.async_request_refresh() + except KobraXApiError as err: + raise ServiceValidationError(str(err)) from err + + async def async_turn_off(self, **kwargs) -> None: + api = self.hass.data[DOMAIN][self._entry.entry_id]["api"] + try: + await api.async_set_ace_auto_feed(self._ace_id, False) + await self.coordinator.async_request_refresh() + except KobraXApiError as err: + raise ServiceValidationError(str(err)) from err + + +async def async_setup_entry(hass, entry, async_add_entities): + coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"] + + # Pre-create switches for all 4 possible ACE units + async_add_entities( + [KobraXAceAutoFeedSwitch(coordinator, entry, ace_id) for ace_id in range(4)] + ) diff --git a/hacs.json b/hacs.json index d4672ac..fe5e0ef 100644 --- a/hacs.json +++ b/hacs.json @@ -4,7 +4,7 @@ "domains": [ "kobrax_lan" ], - "homeassistant": "2024.6.0", + "homeassistant": "2026.3.0", "iot_class": "local_polling", "render_readme": true }