mirror of
https://github.com/gangoke/kobrax-lan-hass-component.git
synced 2026-06-10 05:02:12 +02:00
Compare commits
14 Commits
camera-rew
...
ace-fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2f153ae26 | ||
|
|
cf3b4c1ca1 | ||
|
|
8370153dd2 | ||
|
|
cba7f9b707 | ||
|
|
4e5aeb54a8 | ||
|
|
2623f2739a | ||
|
|
ec1fbbc520 | ||
|
|
3fb047708e | ||
|
|
13ce4d48f3 | ||
|
|
96ccfb98ff | ||
|
|
2cc51085e4 | ||
|
|
b03611913a | ||
|
|
9603d449e6 | ||
|
|
3f3e4f534d |
65
README.md
65
README.md
@@ -1,42 +1,61 @@
|
||||
# Kobra X Home Assistant Component
|
||||
# Kobra X LAN for Home Assistant
|
||||
|
||||
Home Assistant HACS integration for controlling and monitoring an Anycubic Kobra X through KX-Bridge.
|
||||
Home Assistant integration for monitoring and controlling an Anycubic Kobra X through KX-Bridge.
|
||||
|
||||
This project was coded with AI assistance and should be reviewed before use in production.
|
||||
|
||||
Architecture:
|
||||
|
||||
- printer <-> KX-Bridge-Release <-> this integration <-> Home Assistant
|
||||
- printer <-> [KX-Bridge](https://gitea.it-drui.de/viewit/KX-Bridge-Release) <-> this integration <-> Home Assistant
|
||||
|
||||
## Features
|
||||
## Requirements
|
||||
|
||||
- Auto-discovered status from KX-Bridge `/api/state`
|
||||
- Core printer sensors (state, temperatures, progress, file, layer/time data)
|
||||
- Light control
|
||||
- Print speed mode selection
|
||||
- Printer action buttons (pause, resume, cancel)
|
||||
- Camera stream entity using the printer RTSP URL from KX-Bridge, with bridge MJPEG proxy fallback
|
||||
- Camera snapshot fallback using `/api/camera/snapshot`
|
||||
- Running and reachable [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release)
|
||||
- Bridge endpoint accessible from Home Assistant at `http://<bridge-host>:7125`
|
||||
|
||||
## Prerequisites
|
||||
## Installation
|
||||
|
||||
1. KX-Bridge must be running and reachable from Home Assistant.
|
||||
2. Verify KX-Bridge is accessible at `http://<bridge-host>:7125`.
|
||||
### Option 1: HACS
|
||||
|
||||
## Installation (HACS)
|
||||
|
||||
1. Add this repository as a custom repository in HACS with category `Integration`.
|
||||
2. Install the integration.
|
||||
1. In HACS, add this repository as a custom repository (category: Integration):
|
||||
`https://github.com/gangoke/kobrax-lan-hass-component`
|
||||
2. Install Kobra X LAN from HACS.
|
||||
3. Restart Home Assistant.
|
||||
4. Add integration `Kobra X` from Settings -> Devices & Services.
|
||||
4. Go to Settings -> Devices & Services -> Add Integration.
|
||||
5. Search for Kobra X LAN.
|
||||
|
||||
### Option 2: Manual (local custom_components)
|
||||
|
||||
1. Copy the `kobrax_lan` folder into your Home Assistant `custom_components` directory:
|
||||
`<config>/custom_components/kobrax_lan`
|
||||
3. Restart Home Assistant.
|
||||
4. Add Kobra X LAN from Settings -> Devices & Services.
|
||||
|
||||
## Configuration
|
||||
|
||||
The config flow asks for:
|
||||
|
||||
- Host: KX-Bridge host and port (example: `192.168.1.50:7125`)
|
||||
- Printer name: Friendly display name
|
||||
- Printer name: Friendly display name in Home Assistant
|
||||
|
||||
## Entity Overview
|
||||
|
||||
| Platform | Key Entities |
|
||||
| --- | --- |
|
||||
| Binary Sensor | Online, Printing, Light State |
|
||||
| Sensor | State, Print State, Progress, Hotend Temperature, Target Hotend Temperature, Bed Temperature, Target Bed Temperature, Filename, Current Layer, Total Layers, Remaining Time, Print Duration, Skip Object Count, Skipped Object Count, Filament Mode, ACE Unit Count, Bridge Version, Latest Available Version, Slot 1..Slot 19, ACE 1..4 Dryer Status, ACE 1..4 Dryer Humidity, ACE 1..4 Dryer Current Temperature, ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Remaining Time |
|
||||
| Button | Pause Print, Resume Print, Cancel Print, Connect Bridge, Disconnect Bridge, Refresh Skip State, Apply Update (KX-Bridge) |
|
||||
| Switch | ACE 1..4 Auto Fill, ACE 1..4 Dryer |
|
||||
| Number | ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Duration |
|
||||
| Select | Print speed |
|
||||
| Light | Printer light |
|
||||
| Camera | Printer camera |
|
||||
| Image | G-code thumbnail |
|
||||
|
||||
Slot and ACE entities are pre-created and automatically enabled/disabled based on detected slot mode and ACE unit count from KX-Bridge.
|
||||
|
||||
## Notes
|
||||
|
||||
- This integration talks to KX-Bridge HTTP endpoints and does not connect directly to the printer.
|
||||
- Keep KX-Bridge and Home Assistant on the same trusted network.
|
||||
- Native WebRTC is not implemented in this integration. If you want WebRTC in Home Assistant, point `go2rtc` or a WebRTC-capable HA add-on at the camera entity's RTSP source.
|
||||
- This integration communicates with KX-Bridge HTTP endpoints and does not connect directly to the printer.
|
||||
- Keep KX-Bridge and Home Assistant on a trusted local network.
|
||||
- Native WebRTC is not implemented. For WebRTC in Home Assistant, point `go2rtc` (or another WebRTC-capable add-on) to the camera RTSP source.
|
||||
|
||||
@@ -34,6 +34,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"coordinator": coordinator,
|
||||
"api": api,
|
||||
"entry": entry,
|
||||
"ace_dry_config": {
|
||||
0: {
|
||||
"target_temp": int((coordinator.data or {}).get("ace_drying", {}).get("target_temp", 45) or 45),
|
||||
"duration": int((coordinator.data or {}).get("ace_drying", {}).get("duration", 240) or 240),
|
||||
},
|
||||
1: {
|
||||
"target_temp": 45,
|
||||
"duration": 240,
|
||||
},
|
||||
2: {
|
||||
"target_temp": 45,
|
||||
"duration": 240,
|
||||
},
|
||||
3: {
|
||||
"target_temp": 45,
|
||||
"duration": 240,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KobraXApiError(Exception):
|
||||
"""Raised when communication with KX-Bridge fails."""
|
||||
|
||||
@@ -49,6 +53,70 @@ class KobraXApiClient:
|
||||
return result
|
||||
raise KobraXApiError("Unexpected response for /kx/files")
|
||||
|
||||
async def async_get_file_objects(self, file_id: str) -> dict[str, Any]:
|
||||
data = await self._get_json(f"/kx/files/{file_id}/objects")
|
||||
result = data.get("result", {})
|
||||
if isinstance(result, dict):
|
||||
return result
|
||||
raise KobraXApiError("Unexpected response for /kx/files/{id}/objects")
|
||||
|
||||
async def async_skip_objects(self, names: list[str]) -> dict[str, Any]:
|
||||
data = await self._post_json("/kx/skip", {"names": names})
|
||||
result = data.get("result")
|
||||
if result is None:
|
||||
raise KobraXApiError("Unexpected response for /kx/skip")
|
||||
return data
|
||||
|
||||
async def async_skip_query(self) -> dict[str, Any]:
|
||||
data = await self._post_json("/kx/skip/query", {})
|
||||
result = data.get("result", {})
|
||||
if isinstance(result, dict):
|
||||
return result
|
||||
raise KobraXApiError("Unexpected response for /kx/skip/query")
|
||||
|
||||
async def async_get_skip_state(self) -> dict[str, Any]:
|
||||
data = await self._get_json("/kx/skip/state")
|
||||
result = data.get("result", {})
|
||||
if isinstance(result, dict):
|
||||
return result
|
||||
raise KobraXApiError("Unexpected response for /kx/skip/state")
|
||||
|
||||
async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]:
|
||||
data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on})
|
||||
if data.get("error") not in (None, ""):
|
||||
raise KobraXApiError(str(data["error"]))
|
||||
return data
|
||||
|
||||
async def async_set_ace_dry(
|
||||
self,
|
||||
action: str,
|
||||
target_temp: int | None = None,
|
||||
duration: int | None = None,
|
||||
ace_id: int | None = None,
|
||||
) -> dict[str, Any]:
|
||||
payload: dict[str, Any] = {"action": action}
|
||||
if target_temp is not None:
|
||||
payload["target_temp"] = int(target_temp)
|
||||
if duration is not None:
|
||||
payload["duration"] = int(duration)
|
||||
if ace_id is not None:
|
||||
payload["ace_id"] = int(ace_id)
|
||||
|
||||
try:
|
||||
data = await self._post_json("/api/ace/dry", payload)
|
||||
except KobraXApiError as err:
|
||||
# Some bridge versions can return a false 502 while setDry is
|
||||
# still applied successfully on the printer.
|
||||
msg = str(err)
|
||||
if "502" in msg and "/api/ace/dry" in msg:
|
||||
_LOGGER.warning("Ignoring bridge 502 for /api/ace/dry because command may already be applied: %s", msg)
|
||||
return {"result": "ok", "warning": "ignored_502"}
|
||||
raise
|
||||
|
||||
if data.get("error") not in (None, ""):
|
||||
raise KobraXApiError(str(data["error"]))
|
||||
return data
|
||||
|
||||
async def async_pause_print(self) -> None:
|
||||
await self._post_json("/printer/print/pause", {})
|
||||
|
||||
@@ -79,9 +147,24 @@ class KobraXApiClient:
|
||||
async def async_disconnect(self) -> None:
|
||||
await self._post_json("/api/disconnect", {})
|
||||
|
||||
async def async_restart_bridge(self) -> None:
|
||||
await self._post_json("/api/restart", {})
|
||||
|
||||
async def async_start_camera(self) -> None:
|
||||
await self._post_json("/api/camera/start", {})
|
||||
|
||||
async def async_stop_camera(self) -> None:
|
||||
await self._post_json("/api/camera/stop", {})
|
||||
|
||||
async def async_check_updates(self) -> dict[str, Any]:
|
||||
return await self._get_json("/api/update/check")
|
||||
|
||||
async def async_apply_update(self, tag: str, download_url: str) -> dict[str, Any]:
|
||||
return await self._post_json(
|
||||
"/api/update/apply",
|
||||
{"tag": tag, "download_url": download_url},
|
||||
)
|
||||
|
||||
async def async_get_camera_url(self) -> str | None:
|
||||
data = await self._get_json("/api/camera")
|
||||
url = data.get("url")
|
||||
|
||||
@@ -7,6 +7,7 @@ from homeassistant.components.binary_sensor import (
|
||||
BinarySensorEntity,
|
||||
BinarySensorEntityDescription,
|
||||
)
|
||||
from homeassistant.helpers.entity import EntityCategory
|
||||
|
||||
from .const import DOMAIN
|
||||
from .entity import KobraXEntity
|
||||
@@ -23,6 +24,7 @@ BINARY_SENSORS: tuple[KobraXBinaryDescription, ...] = (
|
||||
name="Online",
|
||||
value_key="kobra_state",
|
||||
device_class=BinarySensorDeviceClass.CONNECTIVITY,
|
||||
entity_category=EntityCategory.DIAGNOSTIC,
|
||||
),
|
||||
KobraXBinaryDescription(
|
||||
key="printing",
|
||||
|
||||
BIN
custom_components/kobrax_lan/brand/dark_icon.png
Normal file
BIN
custom_components/kobrax_lan/brand/dark_icon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 9.8 KiB |
BIN
custom_components/kobrax_lan/brand/dark_logo.png
Normal file
BIN
custom_components/kobrax_lan/brand/dark_logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 9.8 KiB |
BIN
custom_components/kobrax_lan/brand/icon.png
Normal file
BIN
custom_components/kobrax_lan/brand/icon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 9.8 KiB |
BIN
custom_components/kobrax_lan/brand/logo.png
Normal file
BIN
custom_components/kobrax_lan/brand/logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 9.8 KiB |
@@ -4,6 +4,7 @@ from dataclasses import dataclass
|
||||
|
||||
from homeassistant.components.button import ButtonEntity, ButtonEntityDescription
|
||||
from homeassistant.exceptions import ServiceValidationError
|
||||
from homeassistant.helpers.entity import EntityCategory
|
||||
|
||||
from .api import KobraXApiError
|
||||
from .const import DOMAIN
|
||||
@@ -13,6 +14,7 @@ from .entity import KobraXEntity
|
||||
@dataclass(frozen=True, kw_only=True)
|
||||
class KobraXButtonDescription(ButtonEntityDescription):
|
||||
action: str
|
||||
ace_id: int | None = None # None for non-ACE buttons, 0-3 for ACE buttons
|
||||
|
||||
|
||||
BUTTONS: tuple[KobraXButtonDescription, ...] = (
|
||||
@@ -39,6 +41,7 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
|
||||
name="Connect Bridge",
|
||||
icon="mdi:lan-connect",
|
||||
action="connect",
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
entity_registry_enabled_default=False,
|
||||
),
|
||||
KobraXButtonDescription(
|
||||
@@ -46,7 +49,29 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
|
||||
name="Disconnect Bridge",
|
||||
icon="mdi:lan-disconnect",
|
||||
action="disconnect",
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
entity_registry_enabled_default=False,
|
||||
),
|
||||
KobraXButtonDescription(
|
||||
key="restart_bridge",
|
||||
name="Restart (KX-Bridge)",
|
||||
icon="mdi:restart",
|
||||
action="restart",
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
),
|
||||
KobraXButtonDescription(
|
||||
key="refresh_skip_state",
|
||||
name="Refresh Skip State",
|
||||
icon="mdi:refresh",
|
||||
action="skip_query",
|
||||
entity_registry_enabled_default=False,
|
||||
),
|
||||
KobraXButtonDescription(
|
||||
key="apply_update",
|
||||
name="Apply Update (KX-Bridge)",
|
||||
icon="mdi:download-circle-outline",
|
||||
action="apply_update",
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -58,6 +83,25 @@ class KobraXActionButton(KobraXEntity, ButtonEntity):
|
||||
super().__init__(coordinator, entry, description.key, description.name)
|
||||
self.entity_description = description
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
if self.entity_description.action != "apply_update":
|
||||
return super().available
|
||||
|
||||
update_info = self.state_data.get("update_info")
|
||||
if not isinstance(update_info, dict):
|
||||
return False
|
||||
|
||||
current = str(update_info.get("current") or "").strip()
|
||||
latest = str(update_info.get("latest") or "").strip()
|
||||
if current and latest and current == latest:
|
||||
return False
|
||||
|
||||
if update_info.get("update_available") is False:
|
||||
return False
|
||||
|
||||
return super().available
|
||||
|
||||
async def async_press(self) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
@@ -71,16 +115,25 @@ class KobraXActionButton(KobraXEntity, ButtonEntity):
|
||||
await api.async_connect()
|
||||
elif self.entity_description.action == "disconnect":
|
||||
await api.async_disconnect()
|
||||
elif self.entity_description.action == "restart":
|
||||
await api.async_restart_bridge()
|
||||
elif self.entity_description.action == "skip_query":
|
||||
await api.async_skip_query()
|
||||
elif self.entity_description.action == "apply_update":
|
||||
await self.coordinator.async_apply_update()
|
||||
await self.coordinator.async_request_refresh()
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
except Exception as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
|
||||
async def async_setup_entry(hass, entry, async_add_entities):
|
||||
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
|
||||
async_add_entities(
|
||||
[
|
||||
KobraXActionButton(coordinator, entry, description)
|
||||
for description in BUTTONS
|
||||
]
|
||||
)
|
||||
|
||||
entities = [
|
||||
KobraXActionButton(coordinator, entry, description)
|
||||
for description in BUTTONS
|
||||
]
|
||||
|
||||
async_add_entities(entities)
|
||||
|
||||
@@ -10,7 +10,8 @@ CONF_PRINTER_NAME = "printer_name"
|
||||
DEFAULT_HOST = "localhost:7125"
|
||||
DEFAULT_PRINTER_NAME = "Anycubic Kobra X"
|
||||
|
||||
UPDATE_INTERVAL = timedelta(seconds=5)
|
||||
UPDATE_INTERVAL = timedelta(seconds=3)
|
||||
UPDATE_CHECK_INTERVAL = timedelta(hours=1)
|
||||
|
||||
PLATFORMS = [
|
||||
"sensor",
|
||||
@@ -18,6 +19,8 @@ PLATFORMS = [
|
||||
"light",
|
||||
"select",
|
||||
"button",
|
||||
"switch",
|
||||
"number",
|
||||
"camera",
|
||||
"image",
|
||||
]
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
|
||||
from .api import KobraXApiClient, KobraXApiError
|
||||
from .const import DOMAIN, UPDATE_INTERVAL
|
||||
from .const import DOMAIN, UPDATE_CHECK_INTERVAL, UPDATE_INTERVAL
|
||||
|
||||
|
||||
class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||
@@ -18,9 +19,60 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||
update_interval=UPDATE_INTERVAL,
|
||||
)
|
||||
self.api = api
|
||||
self._update_info: dict[str, Any] = {}
|
||||
self._next_update_check_monotonic = 0.0
|
||||
|
||||
async def _async_update_data(self) -> dict[str, Any]:
|
||||
try:
|
||||
return await self.api.async_get_state()
|
||||
state = await self.api.async_get_state()
|
||||
|
||||
now = time.monotonic()
|
||||
if now >= self._next_update_check_monotonic:
|
||||
try:
|
||||
self._update_info = await self.api.async_check_updates()
|
||||
except KobraXApiError:
|
||||
# Keep integration polling resilient if update service is temporarily unavailable.
|
||||
pass
|
||||
self._next_update_check_monotonic = now + UPDATE_CHECK_INTERVAL.total_seconds()
|
||||
|
||||
if self._update_info:
|
||||
state["update_info"] = self._update_info
|
||||
try:
|
||||
skip_state = await self.api.async_get_skip_state()
|
||||
state["skip_state"] = skip_state
|
||||
except KobraXApiError:
|
||||
# Skip endpoints are only available on newer bridge versions.
|
||||
pass
|
||||
return state
|
||||
except KobraXApiError as err:
|
||||
raise UpdateFailed(str(err)) from err
|
||||
|
||||
async def async_check_updates(self) -> dict[str, Any]:
|
||||
try:
|
||||
update_info = await self.api.async_check_updates()
|
||||
except KobraXApiError as err:
|
||||
raise UpdateFailed(str(err)) from err
|
||||
|
||||
self._update_info = update_info
|
||||
merged = dict(self.data or {})
|
||||
merged["update_info"] = update_info
|
||||
self.async_set_updated_data(merged)
|
||||
return update_info
|
||||
|
||||
async def async_apply_update(self) -> dict[str, Any]:
|
||||
update_info = self._update_info or await self.async_check_updates()
|
||||
tag = str(update_info.get("tag") or "").strip()
|
||||
download_url = str(update_info.get("download_url") or "").strip()
|
||||
if not tag or not download_url:
|
||||
raise UpdateFailed("Missing tag or download URL from update check")
|
||||
|
||||
try:
|
||||
result = await self.api.async_apply_update(tag=tag, download_url=download_url)
|
||||
except KobraXApiError as err:
|
||||
raise UpdateFailed(str(err)) from err
|
||||
|
||||
self._update_info = {**update_info, "last_apply_result": result}
|
||||
merged = dict(self.data or {})
|
||||
merged["update_info"] = self._update_info
|
||||
self.async_set_updated_data(merged)
|
||||
return result
|
||||
|
||||
@@ -5,8 +5,9 @@
|
||||
"@Gangoke"
|
||||
],
|
||||
"config_flow": true,
|
||||
"homeassistant": "2026.3.0",
|
||||
"documentation": "https://github.com/gangoke/kobrax-lan-hass-component",
|
||||
"iot_class": "local_polling",
|
||||
"issue_tracker": "https://github.com/gangoke/kobrax-lan-hass-component/issues",
|
||||
"version": "0.1.0"
|
||||
"version": "0.2.0"
|
||||
}
|
||||
|
||||
93
custom_components/kobrax_lan/number.py
Normal file
93
custom_components/kobrax_lan/number.py
Normal file
@@ -0,0 +1,93 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.components.number import NumberEntity
|
||||
|
||||
from .const import DOMAIN
|
||||
from .entity import KobraXEntity
|
||||
|
||||
|
||||
def _minutes_to_hhmmss(minutes: int | float) -> str:
|
||||
"""Convert minutes to HH:mm:ss format."""
|
||||
total_seconds = int(minutes * 60)
|
||||
hours = total_seconds // 3600
|
||||
remaining_seconds = total_seconds % 3600
|
||||
mins = remaining_seconds // 60
|
||||
secs = remaining_seconds % 60
|
||||
return f"{hours:02d}:{mins:02d}:{secs:02d}"
|
||||
|
||||
|
||||
class KobraXAceDryConfigNumber(KobraXEntity, NumberEntity):
|
||||
def __init__(
|
||||
self,
|
||||
coordinator,
|
||||
entry,
|
||||
ace_id: int,
|
||||
config_type: str, # "target_temp" or "duration"
|
||||
) -> None:
|
||||
if config_type == "target_temp":
|
||||
unique_key = f"ace_{ace_id}_dry_target_temp"
|
||||
name = f"ACE {ace_id + 1} Dryer Target Temperature"
|
||||
min_val, max_val, step_val = 30, 80, 1
|
||||
unit = "°C"
|
||||
icon = "mdi:thermometer"
|
||||
else: # duration
|
||||
unique_key = f"ace_{ace_id}_dry_duration"
|
||||
name = f"ACE {ace_id + 1} Dryer Duration"
|
||||
min_val, max_val, step_val = 10, 1440, 1
|
||||
unit = "min"
|
||||
icon = "mdi:timer-cog-outline"
|
||||
|
||||
super().__init__(coordinator, entry, unique_key, name)
|
||||
self._ace_id = ace_id
|
||||
self._config_type = config_type
|
||||
self._attr_native_min_value = min_val
|
||||
self._attr_native_max_value = max_val
|
||||
self._attr_native_step = step_val
|
||||
self._attr_native_unit_of_measurement = unit
|
||||
self._attr_mode = "box"
|
||||
self._attr_icon = icon
|
||||
|
||||
@property
|
||||
def native_value(self) -> float:
|
||||
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
|
||||
ace_cfg = cfg.get(self._ace_id) or {}
|
||||
key = "target_temp" if self._config_type == "target_temp" else "duration"
|
||||
if self._config_type == "target_temp":
|
||||
return float(ace_cfg.get(key, 45))
|
||||
else:
|
||||
return float(ace_cfg.get(key, 240))
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self) -> dict[str, Any] | None:
|
||||
if self._config_type != "duration":
|
||||
return None
|
||||
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
|
||||
ace_cfg = cfg.get(self._ace_id) or {}
|
||||
duration_minutes = ace_cfg.get("duration", 240)
|
||||
return {"formatted_duration": _minutes_to_hhmmss(duration_minutes)}
|
||||
|
||||
async def async_set_native_value(self, value: float) -> None:
|
||||
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
|
||||
if self._ace_id not in cfg:
|
||||
cfg[self._ace_id] = {}
|
||||
key = "target_temp" if self._config_type == "target_temp" else "duration"
|
||||
cfg[self._ace_id][key] = int(round(value))
|
||||
self.async_write_ha_state()
|
||||
|
||||
|
||||
async def async_setup_entry(hass, entry, async_add_entities):
|
||||
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
|
||||
|
||||
# Initialize ace_dry_config structure if not present
|
||||
if "ace_dry_config" not in hass.data[DOMAIN][entry.entry_id]:
|
||||
hass.data[DOMAIN][entry.entry_id]["ace_dry_config"] = {}
|
||||
|
||||
# Pre-create all 8 numbers (target_temp + duration for each of 4 ACE units)
|
||||
entities = []
|
||||
for ace_id in range(4):
|
||||
entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "target_temp"))
|
||||
entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "duration"))
|
||||
|
||||
async_add_entities(entities)
|
||||
@@ -2,14 +2,45 @@ from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from urllib.parse import quote
|
||||
|
||||
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity, SensorEntityDescription
|
||||
from homeassistant.const import PERCENTAGE, UnitOfTemperature
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.entity import EntityCategory
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
from homeassistant.helpers.entity_registry import RegistryEntryDisabler
|
||||
|
||||
from .const import DOMAIN
|
||||
from .entity import KobraXEntity
|
||||
|
||||
|
||||
MAX_FILAMENT_SLOTS = 19
|
||||
TOOLHEAD_SLOT_LIMIT = 4
|
||||
ACE_DIRECT_SLOT_LIMIT = 4
|
||||
MAX_ACE_UNITS = 4
|
||||
|
||||
|
||||
def _detected_slot_limit(state_data: dict[str, Any]) -> int:
|
||||
mode = str(state_data.get("filament_mode") or "toolhead").lower()
|
||||
if mode == "toolhead":
|
||||
return TOOLHEAD_SLOT_LIMIT
|
||||
if mode == "ace_direct":
|
||||
return ACE_DIRECT_SLOT_LIMIT
|
||||
|
||||
slots = state_data.get("ams_slots") or []
|
||||
if isinstance(slots, list) and slots:
|
||||
return min(len(slots), MAX_FILAMENT_SLOTS)
|
||||
return MAX_FILAMENT_SLOTS
|
||||
|
||||
|
||||
def _detected_ace_unit_count(state_data: dict[str, Any]) -> int:
|
||||
units = state_data.get("ace_units") or []
|
||||
if isinstance(units, list):
|
||||
return min(len(units), MAX_ACE_UNITS)
|
||||
return 0
|
||||
|
||||
|
||||
@dataclass(frozen=True, kw_only=True)
|
||||
class KobraXSensorDescription(SensorEntityDescription):
|
||||
value_key: str
|
||||
@@ -97,6 +128,42 @@ SENSORS: tuple[KobraXSensorDescription, ...] = (
|
||||
value_key="print_duration",
|
||||
icon="mdi:timer-outline",
|
||||
),
|
||||
KobraXSensorDescription(
|
||||
key="skip_object_count",
|
||||
name="Skip Object Count",
|
||||
value_key="skip_object_count",
|
||||
icon="mdi:vector-polygon",
|
||||
),
|
||||
KobraXSensorDescription(
|
||||
key="skipped_object_count",
|
||||
name="Skipped Object Count",
|
||||
value_key="skipped_object_count",
|
||||
icon="mdi:content-cut",
|
||||
),
|
||||
KobraXSensorDescription(
|
||||
key="filament_mode",
|
||||
name="Filament Mode",
|
||||
value_key="filament_mode",
|
||||
icon="mdi:shape-outline",
|
||||
),
|
||||
KobraXSensorDescription(
|
||||
key="ace_unit_count",
|
||||
name="ACE Unit Count",
|
||||
value_key="ace_unit_count",
|
||||
icon="mdi:package-variant",
|
||||
),
|
||||
KobraXSensorDescription(
|
||||
key="bridge_version",
|
||||
name="Bridge Version",
|
||||
value_key="version",
|
||||
icon="mdi:source-branch",
|
||||
),
|
||||
KobraXSensorDescription(
|
||||
key="latest_available_version",
|
||||
name="Latest Available Version",
|
||||
value_key="latest_available_version",
|
||||
icon="mdi:cloud-download-outline",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -106,31 +173,183 @@ class KobraXSensor(KobraXEntity, SensorEntity):
|
||||
def __init__(self, coordinator, entry, description: KobraXSensorDescription) -> None:
|
||||
super().__init__(coordinator, entry, description.key, description.name)
|
||||
self.entity_description = description
|
||||
if description.value_key in ("version", "latest_available_version"):
|
||||
self._attr_entity_category = EntityCategory.DIAGNOSTIC
|
||||
|
||||
@staticmethod
|
||||
def _seconds_to_hhmmss(seconds: int) -> str:
|
||||
hours = seconds // 3600
|
||||
remaining_seconds = seconds % 3600
|
||||
minutes = remaining_seconds // 60
|
||||
secs = remaining_seconds % 60
|
||||
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
||||
|
||||
@property
|
||||
def native_value(self) -> Any:
|
||||
if self.entity_description.value_key == "filament_mode":
|
||||
return self.state_data.get("filament_mode")
|
||||
if self.entity_description.value_key == "ace_unit_count":
|
||||
units = self.state_data.get("ace_units")
|
||||
return len(units) if isinstance(units, list) else 0
|
||||
if self.entity_description.value_key == "latest_available_version":
|
||||
update_info = self.state_data.get("update_info") or {}
|
||||
if isinstance(update_info, dict):
|
||||
return update_info.get("latest")
|
||||
return None
|
||||
if self.entity_description.value_key == "skip_object_count":
|
||||
skip_state = self.state_data.get("skip_state") or {}
|
||||
objects = skip_state.get("objects") if isinstance(skip_state, dict) else []
|
||||
return len(objects) if isinstance(objects, list) else 0
|
||||
if self.entity_description.value_key == "skipped_object_count":
|
||||
skip_state = self.state_data.get("skip_state") or {}
|
||||
skipped = skip_state.get("skipped") if isinstance(skip_state, dict) else []
|
||||
return len(skipped) if isinstance(skipped, list) else 0
|
||||
|
||||
value = self.state_data.get(self.entity_description.value_key)
|
||||
if self.entity_description.value_key == "progress" and value is not None:
|
||||
return round(float(value) * 100, 1)
|
||||
if self.entity_description.value_key in ("remain_time", "print_duration") and value is not None:
|
||||
try:
|
||||
return self._seconds_to_hhmmss(int(float(value)))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return value
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self) -> dict[str, Any] | None:
|
||||
if self.entity_description.value_key == "latest_available_version":
|
||||
update_info = self.state_data.get("update_info")
|
||||
if isinstance(update_info, dict):
|
||||
return {
|
||||
"current": update_info.get("current"),
|
||||
"tag": update_info.get("tag"),
|
||||
"update_available": update_info.get("update_available"),
|
||||
"download_url": update_info.get("download_url"),
|
||||
}
|
||||
return None
|
||||
|
||||
if self.entity_description.value_key not in ("skip_object_count", "skipped_object_count"):
|
||||
return None
|
||||
|
||||
skip_state = self.state_data.get("skip_state")
|
||||
if not isinstance(skip_state, dict):
|
||||
return None
|
||||
|
||||
objects = skip_state.get("objects")
|
||||
skipped = skip_state.get("skipped")
|
||||
return {
|
||||
"objects": objects if isinstance(objects, list) else [],
|
||||
"skipped": skipped if isinstance(skipped, list) else [],
|
||||
"filename": skip_state.get("filename"),
|
||||
"ts": skip_state.get("ts"),
|
||||
}
|
||||
|
||||
|
||||
class KobraXAceDryerSensor(KobraXEntity, SensorEntity):
|
||||
"""Per-ACE-unit dryer sensor."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator,
|
||||
entry,
|
||||
ace_id: int,
|
||||
sensor_type: str, # "status", "humidity", "current_temp", "target_temp", "remaining_time"
|
||||
) -> None:
|
||||
if sensor_type == "status":
|
||||
unique_key = f"ace_{ace_id}_dryer_status"
|
||||
name = f"ACE {ace_id + 1} Dryer Status"
|
||||
icon = "mdi:tumble-dryer"
|
||||
elif sensor_type == "humidity":
|
||||
unique_key = f"ace_{ace_id}_dryer_humidity"
|
||||
name = f"ACE {ace_id + 1} Dryer Humidity"
|
||||
icon = "mdi:water-percent"
|
||||
elif sensor_type == "current_temp":
|
||||
unique_key = f"ace_{ace_id}_dryer_current_temp"
|
||||
name = f"ACE {ace_id + 1} Dryer Current Temperature"
|
||||
icon = None
|
||||
elif sensor_type == "target_temp":
|
||||
unique_key = f"ace_{ace_id}_dryer_target_temp"
|
||||
name = f"ACE {ace_id + 1} Dryer Target Temperature"
|
||||
icon = None
|
||||
else: # remaining_time
|
||||
unique_key = f"ace_{ace_id}_dryer_remaining_time"
|
||||
name = f"ACE {ace_id + 1} Dryer Remaining Time"
|
||||
icon = "mdi:timer-sand"
|
||||
|
||||
super().__init__(coordinator, entry, unique_key, name)
|
||||
self._ace_id = ace_id
|
||||
self._sensor_type = sensor_type
|
||||
if icon:
|
||||
self._attr_icon = icon
|
||||
|
||||
if sensor_type == "humidity":
|
||||
self._attr_native_unit_of_measurement = PERCENTAGE
|
||||
elif sensor_type in ("current_temp", "target_temp"):
|
||||
self._attr_device_class = SensorDeviceClass.TEMPERATURE
|
||||
self._attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
|
||||
self._attr_suggested_unit_of_measurement = UnitOfTemperature.CELSIUS
|
||||
elif sensor_type == "remaining_time":
|
||||
pass # HH:mm:ss format, no unit needed
|
||||
|
||||
@staticmethod
|
||||
def _minutes_to_hhmmss(minutes: int) -> str:
|
||||
"""Convert minutes to HH:mm:ss format."""
|
||||
total_seconds = minutes * 60
|
||||
hours = total_seconds // 3600
|
||||
remaining_seconds = total_seconds % 3600
|
||||
mins = remaining_seconds // 60
|
||||
secs = remaining_seconds % 60
|
||||
return f"{hours:02d}:{mins:02d}:{secs:02d}"
|
||||
|
||||
@property
|
||||
def native_value(self) -> Any:
|
||||
drying = self.state_data.get("ace_drying") or {}
|
||||
|
||||
# Handle per-unit structure: ace_drying[ace_id] or global structure
|
||||
if isinstance(drying, dict):
|
||||
# Try per-unit key first
|
||||
unit_data = drying.get(self._ace_id)
|
||||
if isinstance(unit_data, dict):
|
||||
drying = unit_data
|
||||
elif self._ace_id == 0 and not unit_data:
|
||||
# Fall back to global structure for unit 0
|
||||
pass
|
||||
else:
|
||||
# No data for this unit
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
if self._sensor_type == "status":
|
||||
status = int(drying.get("status", 0)) if drying.get("status") is not None else 0
|
||||
return "running" if status else "idle"
|
||||
elif self._sensor_type == "humidity":
|
||||
humidity = drying.get("humidity")
|
||||
return round(float(humidity), 1) if humidity is not None else None
|
||||
elif self._sensor_type == "current_temp":
|
||||
current_temp = drying.get("current_temp")
|
||||
return round(float(current_temp), 1) if current_temp is not None else None
|
||||
elif self._sensor_type == "target_temp":
|
||||
target_temp = drying.get("target_temp")
|
||||
return float(target_temp) if target_temp is not None else None
|
||||
elif self._sensor_type == "remaining_time":
|
||||
remain = drying.get("remain_time")
|
||||
if remain is not None:
|
||||
try:
|
||||
minutes = int(remain)
|
||||
return self._minutes_to_hhmmss(minutes)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
|
||||
def __init__(self, coordinator, entry, slot_index: int, field: str) -> None:
|
||||
name_suffix = {
|
||||
"color": f"Filament Slot {slot_index + 1} Color",
|
||||
"type": f"Filament Slot {slot_index + 1} Type",
|
||||
}[field]
|
||||
super().__init__(coordinator, entry, f"filament_slot_{slot_index + 1}_{field}", name_suffix)
|
||||
def __init__(self, coordinator, entry, slot_index: int) -> None:
|
||||
super().__init__(coordinator, entry, f"slot_{slot_index + 1}", f"Slot {slot_index + 1}")
|
||||
self._slot_index = slot_index
|
||||
self._field = field
|
||||
|
||||
if field == "color":
|
||||
self._attr_icon = "mdi:palette"
|
||||
elif field == "type":
|
||||
self._attr_icon = "mdi:label"
|
||||
else:
|
||||
self._attr_icon = "mdi:numeric"
|
||||
self._attr_icon = "mdi:circle"
|
||||
|
||||
def _slot(self) -> dict[str, Any]:
|
||||
slots = self.state_data.get("ams_slots") or []
|
||||
@@ -139,6 +358,9 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
|
||||
slot = slots[self._slot_index]
|
||||
return slot if isinstance(slot, dict) else {}
|
||||
|
||||
def _slot_limit_for_mode(self) -> int:
|
||||
return _detected_slot_limit(self.state_data)
|
||||
|
||||
@staticmethod
|
||||
def _to_color_hex(color: Any) -> str | None:
|
||||
if isinstance(color, list) and len(color) >= 3:
|
||||
@@ -154,30 +376,132 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
return bool(self._slot()) and super().available
|
||||
return self._slot_index < self._slot_limit_for_mode() and bool(self._slot()) and super().available
|
||||
|
||||
@property
|
||||
def native_value(self) -> Any:
|
||||
slot = self._slot()
|
||||
if not slot:
|
||||
return "EMPTY"
|
||||
status = slot.get("status")
|
||||
if status is not None:
|
||||
try:
|
||||
if int(status) != 5:
|
||||
return "EMPTY"
|
||||
except (TypeError, ValueError):
|
||||
return "EMPTY"
|
||||
material = slot.get("type")
|
||||
material_str = str(material).upper() if material else "EMPTY"
|
||||
color_hex = self._to_color_hex(slot.get("color"))
|
||||
if color_hex and material_str != "EMPTY":
|
||||
return f"{material_str} ({color_hex})"
|
||||
return material_str
|
||||
|
||||
@property
|
||||
def icon_color(self) -> str | None:
|
||||
slot = self._slot()
|
||||
if not slot:
|
||||
return None
|
||||
if self._field == "color":
|
||||
return self._to_color_hex(slot.get("color"))
|
||||
if self._field == "type":
|
||||
material = slot.get("type")
|
||||
return str(material).upper() if material else None
|
||||
return None
|
||||
return self._to_color_hex(slot.get("color"))
|
||||
|
||||
@property
|
||||
def entity_picture(self) -> str | None:
|
||||
"""Return a colored circle picture as a frontend fallback when icon tinting is ignored."""
|
||||
slot = self._slot()
|
||||
empty_svg = (
|
||||
"<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 64 64'>"
|
||||
"<circle cx='32' cy='32' r='28' fill='none' stroke='#666' stroke-width='4'/></svg>"
|
||||
)
|
||||
if not slot:
|
||||
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
|
||||
|
||||
status = slot.get("status")
|
||||
if status is not None:
|
||||
try:
|
||||
if int(status) != 5:
|
||||
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
|
||||
except (TypeError, ValueError):
|
||||
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
|
||||
|
||||
color_hex = self._to_color_hex(slot.get("color"))
|
||||
if not color_hex:
|
||||
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
|
||||
|
||||
svg = (
|
||||
"<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 64 64'>"
|
||||
"<circle cx='32' cy='32' r='28' fill='"
|
||||
f"{color_hex}"
|
||||
"' stroke='#222' stroke-width='4'/></svg>"
|
||||
)
|
||||
return f"data:image/svg+xml;utf8,{quote(svg)}"
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self) -> dict[str, Any] | None:
|
||||
slot = self._slot()
|
||||
if not slot:
|
||||
return None
|
||||
return {
|
||||
"color_hex": self._to_color_hex(slot.get("color")),
|
||||
"status": slot.get("status"),
|
||||
"box_id": slot.get("box_id"),
|
||||
"global_index": slot.get("global_index"),
|
||||
"activity": slot.get("activity"),
|
||||
}
|
||||
|
||||
|
||||
async def async_setup_entry(hass, entry, async_add_entities):
|
||||
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
|
||||
slots = coordinator.data.get("ams_slots") if coordinator.data else []
|
||||
slot_count = len(slots) if isinstance(slots, list) and len(slots) > 0 else 4
|
||||
|
||||
filament_entities: list[SensorEntity] = []
|
||||
for slot_index in range(slot_count):
|
||||
for field in ("color", "type"):
|
||||
filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index, field))
|
||||
for slot_index in range(MAX_FILAMENT_SLOTS):
|
||||
filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index))
|
||||
|
||||
ace_dryer_entities: list[SensorEntity] = []
|
||||
for ace_id in range(MAX_ACE_UNITS):
|
||||
for sensor_type in ("status", "humidity", "current_temp", "target_temp", "remaining_time"):
|
||||
ace_dryer_entities.append(KobraXAceDryerSensor(coordinator, entry, ace_id, sensor_type))
|
||||
|
||||
entity_registry = er.async_get(hass)
|
||||
|
||||
# Remove legacy single-unit ACE dryer sensors that were replaced by per-unit entities.
|
||||
legacy_ace_sensor_keys = (
|
||||
"ace_dryer_status",
|
||||
"ace_dryer_humidity",
|
||||
"ace_dryer_current_temp",
|
||||
"ace_dryer_target_temp",
|
||||
"ace_dryer_remaining_time",
|
||||
)
|
||||
for key in legacy_ace_sensor_keys:
|
||||
legacy_unique_id = f"{entry.entry_id}_{key}"
|
||||
legacy_entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, legacy_unique_id)
|
||||
if legacy_entity_id:
|
||||
entity_registry.async_remove(legacy_entity_id)
|
||||
|
||||
@callback
|
||||
def _sync_slot_registry_state() -> None:
|
||||
state_data = coordinator.data or {}
|
||||
enabled_slots = _detected_slot_limit(state_data)
|
||||
|
||||
for slot_index in range(MAX_FILAMENT_SLOTS):
|
||||
unique_id = f"{entry.entry_id}_slot_{slot_index + 1}"
|
||||
entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, unique_id)
|
||||
if not entity_id:
|
||||
continue
|
||||
|
||||
reg_entry = entity_registry.async_get(entity_id)
|
||||
if reg_entry is None:
|
||||
continue
|
||||
|
||||
should_enable = slot_index < enabled_slots
|
||||
if should_enable:
|
||||
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
|
||||
entity_registry.async_update_entity(entity_id, disabled_by=None)
|
||||
else:
|
||||
if reg_entry.disabled_by is None:
|
||||
entity_registry.async_update_entity(
|
||||
entity_id,
|
||||
disabled_by=RegistryEntryDisabler.INTEGRATION,
|
||||
)
|
||||
|
||||
async_add_entities(
|
||||
[
|
||||
@@ -185,4 +509,68 @@ async def async_setup_entry(hass, entry, async_add_entities):
|
||||
for description in SENSORS
|
||||
]
|
||||
+ filament_entities
|
||||
+ ace_dryer_entities
|
||||
)
|
||||
|
||||
@callback
|
||||
def _sync_ace_registry_state() -> None:
|
||||
state_data = coordinator.data or {}
|
||||
enabled_ace_units = _detected_ace_unit_count(state_data)
|
||||
|
||||
# Define all ACE entity patterns: (platform, unique_id_pattern_parts)
|
||||
ace_entities: list[tuple[str, str]] = []
|
||||
|
||||
for ace_index in range(MAX_ACE_UNITS):
|
||||
# Switch entities
|
||||
ace_entities.append(("switch", f"{entry.entry_id}_ace_{ace_index}_auto_feed"))
|
||||
ace_entities.append(("switch", f"{entry.entry_id}_ace_{ace_index}_dryer"))
|
||||
|
||||
# Number entities (temp + duration)
|
||||
ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_target_temp"))
|
||||
ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_duration"))
|
||||
|
||||
# Button entities (start + stop)
|
||||
ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_start"))
|
||||
ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_stop"))
|
||||
|
||||
# Sensor entities (status, humidity, current_temp, target_temp, remaining_time)
|
||||
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_status"))
|
||||
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_humidity"))
|
||||
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_current_temp"))
|
||||
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_target_temp"))
|
||||
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_remaining_time"))
|
||||
|
||||
for platform, unique_id in ace_entities:
|
||||
entity_id = entity_registry.async_get_entity_id(platform, DOMAIN, unique_id)
|
||||
if not entity_id:
|
||||
continue
|
||||
|
||||
reg_entry = entity_registry.async_get(entity_id)
|
||||
if reg_entry is None:
|
||||
continue
|
||||
|
||||
# Extract ace_index from unique_id
|
||||
parts = unique_id.split("_")
|
||||
if len(parts) >= 3:
|
||||
try:
|
||||
ace_index = int(parts[2])
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
else:
|
||||
continue
|
||||
|
||||
should_enable = ace_index < enabled_ace_units
|
||||
if should_enable:
|
||||
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
|
||||
entity_registry.async_update_entity(entity_id, disabled_by=None)
|
||||
else:
|
||||
if reg_entry.disabled_by is None:
|
||||
entity_registry.async_update_entity(
|
||||
entity_id,
|
||||
disabled_by=RegistryEntryDisabler.INTEGRATION,
|
||||
)
|
||||
|
||||
entry.async_on_unload(coordinator.async_add_listener(_sync_slot_registry_state))
|
||||
entry.async_on_unload(coordinator.async_add_listener(_sync_ace_registry_state))
|
||||
_sync_slot_registry_state()
|
||||
_sync_ace_registry_state()
|
||||
|
||||
154
custom_components/kobrax_lan/switch.py
Normal file
154
custom_components/kobrax_lan/switch.py
Normal file
@@ -0,0 +1,154 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.components.switch import SwitchEntity
|
||||
from homeassistant.exceptions import ServiceValidationError
|
||||
|
||||
from .api import KobraXApiError
|
||||
from .const import DOMAIN
|
||||
from .entity import KobraXEntity
|
||||
|
||||
|
||||
class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity):
|
||||
def __init__(self, coordinator, entry, ace_id: int) -> None:
|
||||
super().__init__(coordinator, entry, f"ace_{ace_id}_auto_feed", f"ACE {ace_id + 1} Auto Fill")
|
||||
self._ace_id = ace_id
|
||||
self._attr_icon = "mdi:autorenew"
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
auto_feed = self.state_data.get("ace_auto_feed") or {}
|
||||
if not isinstance(auto_feed, dict):
|
||||
return False
|
||||
value = auto_feed.get(self._ace_id)
|
||||
if value is None:
|
||||
value = auto_feed.get(str(self._ace_id))
|
||||
return bool(value)
|
||||
|
||||
def _apply_optimistic_state(self, is_on: bool) -> None:
|
||||
merged: dict[str, Any] = dict(self.coordinator.data or {})
|
||||
auto_feed = merged.get("ace_auto_feed")
|
||||
if not isinstance(auto_feed, dict):
|
||||
auto_feed = {}
|
||||
else:
|
||||
auto_feed = dict(auto_feed)
|
||||
|
||||
key: int | str = self._ace_id
|
||||
if key not in auto_feed and str(self._ace_id) in auto_feed:
|
||||
key = str(self._ace_id)
|
||||
auto_feed[key] = 1 if is_on else 0
|
||||
|
||||
merged["ace_auto_feed"] = auto_feed
|
||||
self.coordinator.async_set_updated_data(merged)
|
||||
|
||||
async def async_turn_on(self, **kwargs) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
await api.async_set_ace_auto_feed(self._ace_id, True)
|
||||
self._apply_optimistic_state(True)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
async def async_turn_off(self, **kwargs) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
await api.async_set_ace_auto_feed(self._ace_id, False)
|
||||
self._apply_optimistic_state(False)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
|
||||
class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
|
||||
def __init__(self, coordinator, entry, ace_id: int) -> None:
|
||||
super().__init__(coordinator, entry, f"ace_{ace_id}_dryer", f"ACE {ace_id + 1} Dryer")
|
||||
self._ace_id = ace_id
|
||||
self._attr_icon = "mdi:tumble-dryer"
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool:
|
||||
drying = self.state_data.get("ace_drying") or {}
|
||||
if not isinstance(drying, dict):
|
||||
return False
|
||||
|
||||
unit_data = drying.get(self._ace_id)
|
||||
if unit_data is None:
|
||||
unit_data = drying.get(str(self._ace_id))
|
||||
|
||||
if isinstance(unit_data, dict):
|
||||
status = unit_data.get("status")
|
||||
elif self._ace_id == 0:
|
||||
# Backward-compatible fallback: older bridge payloads may expose unit 0 as a flat object.
|
||||
status = drying.get("status")
|
||||
else:
|
||||
status = None
|
||||
|
||||
try:
|
||||
return int(status) > 0 if status is not None else False
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
def _apply_optimistic_state(self, is_on: bool) -> None:
|
||||
merged: dict[str, Any] = dict(self.coordinator.data or {})
|
||||
drying = merged.get("ace_drying")
|
||||
if not isinstance(drying, dict):
|
||||
drying = {}
|
||||
else:
|
||||
drying = dict(drying)
|
||||
|
||||
unit_data = drying.get(self._ace_id)
|
||||
unit_key: int | str = self._ace_id
|
||||
if unit_data is None:
|
||||
unit_data = drying.get(str(self._ace_id))
|
||||
if unit_data is not None:
|
||||
unit_key = str(self._ace_id)
|
||||
|
||||
if isinstance(unit_data, dict):
|
||||
next_unit_data = dict(unit_data)
|
||||
else:
|
||||
next_unit_data = {}
|
||||
|
||||
next_unit_data["status"] = 1 if is_on else 0
|
||||
drying[unit_key] = next_unit_data
|
||||
|
||||
# Keep backward-compatible flat status for unit 0 payload variants.
|
||||
if self._ace_id == 0:
|
||||
drying["status"] = 1 if is_on else 0
|
||||
|
||||
merged["ace_drying"] = drying
|
||||
self.coordinator.async_set_updated_data(merged)
|
||||
|
||||
async def async_turn_on(self, **kwargs) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
|
||||
ace_cfg = cfg.get(self._ace_id) or {}
|
||||
await api.async_set_ace_dry(
|
||||
"start",
|
||||
target_temp=int(ace_cfg.get("target_temp", 45)),
|
||||
duration=int(ace_cfg.get("duration", 240)),
|
||||
ace_id=self._ace_id,
|
||||
)
|
||||
self._apply_optimistic_state(True)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
async def async_turn_off(self, **kwargs) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
await api.async_set_ace_dry("stop", ace_id=self._ace_id)
|
||||
self._apply_optimistic_state(False)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
|
||||
async def async_setup_entry(hass, entry, async_add_entities):
|
||||
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
|
||||
|
||||
entities = []
|
||||
|
||||
# Pre-create switches for all 4 possible ACE units
|
||||
entities.extend(KobraXAceAutoFeedSwitch(coordinator, entry, ace_id) for ace_id in range(4))
|
||||
entities.extend(KobraXAceDryerSwitch(coordinator, entry, ace_id) for ace_id in range(4))
|
||||
|
||||
async_add_entities(entities)
|
||||
Reference in New Issue
Block a user