9 Commits

Author SHA1 Message Date
Gangoke
36d64b876e update for bridge v0.9.18 2026-05-31 18:51:35 -10:00
Gangoke
37549de681 camera update 2026-05-30 17:34:37 -10:00
gangoke
5cb9ae0473 Merge pull request 'ace dryer fix' (#5) from ace-fix into main
Reviewed-on: https://gitea.gangoke.app/gangoke/kobrax-lan-hass-component/pulls/5
2026-05-22 08:51:43 +00:00
Gangoke
b2f153ae26 fix: improve error handling for ACE commands and add optimistic state updates 2026-05-21 22:48:03 -10:00
copilot-swe-agent[bot]
cf3b4c1ca1 fix: surface ACE endpoint error payloads without requiring result field
Agent-Logs-Url: https://github.com/gangoke/kobrax-lan-hass-component/sessions/d19d21f5-b5a1-4db3-b891-dd8cf47f3b54

Co-authored-by: gangoke <77847204+gangoke@users.noreply.github.com>
2026-05-21 12:17:27 +00:00
copilot-swe-agent[bot]
8370153dd2 fix: support ACE auto-fill off toggle and tolerate ACE endpoint responses
Agent-Logs-Url: https://github.com/gangoke/kobrax-lan-hass-component/sessions/d19d21f5-b5a1-4db3-b891-dd8cf47f3b54

Co-authored-by: gangoke <77847204+gangoke@users.noreply.github.com>
2026-05-21 12:15:41 +00:00
copilot-swe-agent[bot]
cba7f9b707 Initial plan 2026-05-21 12:12:23 +00:00
gangoke
4e5aeb54a8 Merge pull request 'ACE entity fixes' (#4) from v0.9.13 into main
Reviewed-on: https://gitea.gangoke.app/gangoke/kobrax-lan-hass-component/pulls/4
2026-05-21 11:06:26 +00:00
gangoke
ec1fbbc520 Merge pull request 'ACE2 Support + more' (#3) from v0.9.13 into main
Reviewed-on: https://gitea.gangoke.app/gangoke/kobrax-lan-hass-component/pulls/3
2026-05-21 10:44:15 +00:00
10 changed files with 468 additions and 61 deletions

View File

@@ -43,11 +43,11 @@ The config flow asks for:
| Platform | Key Entities |
| --- | --- |
| Binary Sensor | Online, Printing, Light State |
| Sensor | State, Print State, Progress, Hotend Temperature, Target Hotend Temperature, Bed Temperature, Target Bed Temperature, Filename, Current Layer, Total Layers, Remaining Time, Print Duration, Skip Object Count, Skipped Object Count, Filament Mode, ACE Unit Count, Bridge Version, Latest Available Version, Slot 1..Slot 19, ACE 1..4 Dryer Status, ACE 1..4 Dryer Humidity, ACE 1..4 Dryer Current Temperature, ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Remaining Time |
| Sensor | State, Print State, Progress, Hotend Temperature, Target Hotend Temperature, Bed Temperature, Target Bed Temperature, Filename, Current Layer, Total Layers, Remaining Time, Print Duration, Skip Object Count, Skipped Object Count, Filament Mode, ACE Unit Count, Bridge Version, Latest Available Version, Camera Stream Mode, Slot 1..Slot 19, ACE 1..4 Dryer Status, ACE 1..4 Dryer Humidity, ACE 1..4 Dryer Current Temperature, ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Remaining Time |
| Button | Pause Print, Resume Print, Cancel Print, Connect Bridge, Disconnect Bridge, Refresh Skip State, Apply Update (KX-Bridge) |
| Switch | ACE 1..4 Auto Fill, ACE 1..4 Dryer |
| Switch | Camera On Print, Web Upload Warning, ACE 1..4 Auto Fill, ACE 1..4 Dryer |
| Number | ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Duration |
| Select | Print speed |
| Select | Print speed, Slot 1..Slot 19 Filament Profile |
| Light | Printer light |
| Camera | Printer camera |
| Image | G-code thumbnail |
@@ -58,4 +58,5 @@ Slot and ACE entities are pre-created and automatically enabled/disabled based o
- This integration communicates with KX-Bridge HTTP endpoints and does not connect directly to the printer.
- Keep KX-Bridge and Home Assistant on a trusted local network.
- Native WebRTC is not implemented. For WebRTC in Home Assistant, point `go2rtc` (or another WebRTC-capable add-on) to the camera RTSP source.
- Camera streaming prefers the bridge H.264 endpoint (`/api/camera/h264`, MPEG-TS passthrough) on newer bridge releases, with RTSP/MJPEG fallback for older releases.
- Native WebRTC is not implemented. For WebRTC in Home Assistant, point `go2rtc` (or another WebRTC-capable add-on) to the camera source you prefer (H.264 bridge endpoint or RTSP).

View File

@@ -1,10 +1,14 @@
from __future__ import annotations
import logging
from typing import Any
import aiohttp
_LOGGER = logging.getLogger(__name__)
class KobraXApiError(Exception):
"""Raised when communication with KX-Bridge fails."""
@@ -20,6 +24,21 @@ class KobraXApiClient:
def camera_stream_proxy_url(self) -> str:
return self._url("/api/camera/stream")
def camera_h264_proxy_url(self) -> str:
return self._url("/api/camera/h264")
async def async_h264_stream_available(self, timeout_seconds: float = 1.5) -> bool:
"""Probe whether the bridge h264 endpoint is reachable now."""
try:
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
async with self._session.get(self.camera_h264_proxy_url(), timeout=timeout) as response:
if response.status != 200:
return False
chunk = await response.content.read(1)
return bool(chunk)
except Exception:
return False
async def _get_json(self, path: str) -> dict[str, Any]:
try:
async with self._session.get(self._url(path)) as response:
@@ -42,6 +61,18 @@ class KobraXApiClient:
async def async_get_state(self) -> dict[str, Any]:
return await self._get_json("/api/state")
async def async_get_settings(self) -> dict[str, Any]:
data = await self._get_json("/api/settings")
if isinstance(data, dict):
return data
raise KobraXApiError("Unexpected response for /api/settings")
async def async_set_settings(self, payload: dict[str, Any]) -> dict[str, Any]:
data = await self._post_json("/api/settings", payload)
if isinstance(data, dict):
return data
raise KobraXApiError("Unexpected response for /api/settings")
async def async_get_files(self) -> list[dict[str, Any]]:
data = await self._get_json("/kx/files")
result = data.get("result", [])
@@ -77,11 +108,49 @@ class KobraXApiClient:
return result
raise KobraXApiError("Unexpected response for /kx/skip/state")
async def async_get_filament_slots(self) -> list[dict[str, Any]]:
data = await self._get_json("/kx/filament/slots")
result = data.get("result", [])
if isinstance(result, list):
return [slot for slot in result if isinstance(slot, dict)]
raise KobraXApiError("Unexpected response for /kx/filament/slots")
async def async_get_filament_profiles(self, material_type: str | None = None) -> list[dict[str, Any]]:
params: dict[str, str] = {}
if material_type:
params["type"] = material_type
try:
async with self._session.get(self._url("/kx/filament/profiles"), params=params or None) as response:
response.raise_for_status()
data = await response.json()
except Exception as err:
raise KobraXApiError(err) from err
result = data.get("result", []) if isinstance(data, dict) else []
if isinstance(result, list):
return [profile for profile in result if isinstance(profile, dict)]
raise KobraXApiError("Unexpected response for /kx/filament/profiles")
async def async_set_filament_slot_profile(
self,
slot_index: int,
filament_id: str,
vendor: str = "",
name: str = "",
) -> dict[str, Any]:
data = await self._post_json(
f"/kx/filament/slots/{int(slot_index)}/profile",
{"id": filament_id, "vendor": vendor, "name": name},
)
if isinstance(data, dict):
return data
raise KobraXApiError("Unexpected response for /kx/filament/slots/{idx}/profile")
async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]:
data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on})
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /api/ace/auto_feed")
if data.get("error") not in (None, ""):
raise KobraXApiError(str(data["error"]))
return data
async def async_set_ace_dry(
@@ -99,10 +168,19 @@ class KobraXApiClient:
if ace_id is not None:
payload["ace_id"] = int(ace_id)
data = await self._post_json("/api/ace/dry", payload)
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /api/ace/dry")
try:
data = await self._post_json("/api/ace/dry", payload)
except KobraXApiError as err:
# Some bridge versions can return a false 502 while setDry is
# still applied successfully on the printer.
msg = str(err)
if "502" in msg and "/api/ace/dry" in msg:
_LOGGER.warning("Ignoring bridge 502 for /api/ace/dry because command may already be applied: %s", msg)
return {"result": "ok", "warning": "ignored_502"}
raise
if data.get("error") not in (None, ""):
raise KobraXApiError(str(data["error"]))
return data
async def async_pause_print(self) -> None:

View File

@@ -51,15 +51,14 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
action="disconnect",
entity_category=EntityCategory.CONFIG,
entity_registry_enabled_default=False,
),
# ENABLE ME WHEN API ENDPOINT IS ADDED
# KobraXButtonDescription(
# key="restart_bridge",
# name="Restart (KX-Bridge)",
# icon="mdi:restart",
# action="restart",
# entity_category=EntityCategory.CONFIG,
# ),
),
KobraXButtonDescription(
key="restart_bridge",
name="Restart (KX-Bridge)",
icon="mdi:restart",
action="restart",
entity_category=EntityCategory.CONFIG,
),
KobraXButtonDescription(
key="refresh_skip_state",
name="Refresh Skip State",

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import re
from homeassistant.components.camera import Camera, CameraEntityFeature
from .api import KobraXApiError
@@ -21,14 +22,27 @@ class KobraXCamera(KobraXEntity, Camera):
@property
def extra_state_attributes(self) -> dict[str, str]:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
camera_url = self.state_data.get("camera_url")
attrs = {
"camera_mjpeg_proxy_url": self.hass.data[DOMAIN][self._entry.entry_id]["api"].camera_stream_proxy_url(),
"camera_h264_proxy_url": api.camera_h264_proxy_url(),
"camera_mjpeg_proxy_url": api.camera_stream_proxy_url(),
}
if isinstance(camera_url, str) and camera_url:
attrs["camera_rtsp_url"] = camera_url
return attrs
@staticmethod
def _bridge_supports_h264_stream(version: str | None) -> bool:
"""Return True when bridge version includes /api/camera/h264 support."""
if not version:
return False
match = re.search(r"(\d+)\.(\d+)\.(\d+)", version)
if not match:
return False
major, minor, patch = (int(part) for part in match.groups())
return (major, minor, patch) >= (0, 9, 17)
async def stream_source(self) -> str | None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
@@ -36,6 +50,14 @@ class KobraXCamera(KobraXEntity, Camera):
except KobraXApiError:
pass
selected_mode = "mjpeg_proxy"
selected_url = api.camera_stream_proxy_url()
version = self.state_data.get("version")
if isinstance(version, str) and self._bridge_supports_h264_stream(version):
if await api.async_h264_stream_available():
return api.camera_h264_proxy_url()
camera_url = self.state_data.get("camera_url")
if isinstance(camera_url, str) and camera_url:
return camera_url
@@ -43,9 +65,12 @@ class KobraXCamera(KobraXEntity, Camera):
try:
camera_url = await api.async_get_camera_url()
except KobraXApiError:
return api.camera_stream_proxy_url()
return selected_url
return camera_url or api.camera_stream_proxy_url()
if isinstance(camera_url, str) and camera_url:
selected_url = camera_url
return selected_url
async def async_camera_image(self, width: int | None = None, height: int | None = None) -> bytes | None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]

View File

@@ -10,7 +10,7 @@ CONF_PRINTER_NAME = "printer_name"
DEFAULT_HOST = "localhost:7125"
DEFAULT_PRINTER_NAME = "Anycubic Kobra X"
UPDATE_INTERVAL = timedelta(seconds=5)
UPDATE_INTERVAL = timedelta(seconds=3)
UPDATE_CHECK_INTERVAL = timedelta(hours=1)
PLATFORMS = [

View File

@@ -20,23 +20,10 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
)
self.api = api
self._update_info: dict[str, Any] = {}
self._restart_supported: bool | None = None
self._next_update_check_monotonic = 0.0
async def _async_update_data(self) -> dict[str, Any]:
try:
# Probe restart endpoint once if not checked
if self._restart_supported is None:
try:
await self.api.async_restart_bridge()
self._restart_supported = True
except Exception as err:
# Only disable if 404/501, otherwise treat as available
msg = str(err)
if "404" in msg or "501" in msg:
self._restart_supported = False
else:
self._restart_supported = True
state = await self.api.async_get_state()
now = time.monotonic()
@@ -50,12 +37,28 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
if self._update_info:
state["update_info"] = self._update_info
try:
settings = await self.api.async_get_settings()
state["settings"] = settings
except KobraXApiError:
# Settings endpoint is only available on newer bridge versions.
pass
try:
skip_state = await self.api.async_get_skip_state()
state["skip_state"] = skip_state
except KobraXApiError:
# Skip endpoints are only available on newer bridge versions.
pass
try:
filament_slots = await self.api.async_get_filament_slots()
state["filament_slots"] = filament_slots
except KobraXApiError:
# Filament profile endpoints are only available on newer bridge versions.
pass
return state
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err

View File

@@ -9,5 +9,5 @@
"documentation": "https://github.com/gangoke/kobrax-lan-hass-component",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/gangoke/kobrax-lan-hass-component/issues",
"version": "0.2.0"
"version": "0.3.0"
}

View File

@@ -1,13 +1,34 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.select import SelectEntity
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity import EntityCategory
from .api import KobraXApiError
from .const import DOMAIN
from .entity import KobraXEntity
MAX_FILAMENT_SLOTS = 19
TOOLHEAD_SLOT_LIMIT = 4
ACE_DIRECT_SLOT_LIMIT = 4
def _detected_slot_limit(state_data: dict[str, Any]) -> int:
mode = str(state_data.get("filament_mode") or "toolhead").lower()
if mode == "toolhead":
return TOOLHEAD_SLOT_LIMIT
if mode == "ace_direct":
return ACE_DIRECT_SLOT_LIMIT
slots = state_data.get("ams_slots") or []
if isinstance(slots, list) and slots:
return min(len(slots), MAX_FILAMENT_SLOTS)
return MAX_FILAMENT_SLOTS
class KobraXPrintSpeedSelect(KobraXEntity, SelectEntity):
_attr_options = ["Slow (1)", "Normal (2)", "Fast (3)"]
@@ -34,8 +55,139 @@ class KobraXPrintSpeedSelect(KobraXEntity, SelectEntity):
raise ServiceValidationError(str(err)) from err
class KobraXFilamentProfileSelect(KobraXEntity, SelectEntity):
_AUTO_OPTION = "Auto (No Override)"
def __init__(self, coordinator, entry, slot_index: int) -> None:
super().__init__(coordinator, entry, f"slot_{slot_index + 1}_filament_profile", f"Slot {slot_index + 1} Filament Profile")
self._slot_index = slot_index
self._attr_icon = "mdi:palette"
self._attr_entity_category = EntityCategory.CONFIG
def _slot_limit_for_mode(self) -> int:
return _detected_slot_limit(self.state_data)
def _slot(self) -> dict[str, Any]:
slots = self.state_data.get("filament_slots") or []
if not isinstance(slots, list):
return {}
for slot in slots:
if not isinstance(slot, dict):
continue
try:
if int(slot.get("slot_index", -1)) == self._slot_index:
return slot
except (TypeError, ValueError):
continue
return {}
def _profile_catalog(self) -> list[dict[str, Any]]:
data = self.hass.data[DOMAIN][self._entry.entry_id]
profiles = data.get("filament_profiles")
if isinstance(profiles, list):
return [item for item in profiles if isinstance(item, dict)]
return []
@staticmethod
def _profile_label(profile: dict[str, Any]) -> str:
name = str(profile.get("name") or profile.get("id") or "Unknown")
vendor = str(profile.get("vendor") or "")
fid = str(profile.get("id") or "")
vendor_part = f" - {vendor}" if vendor else ""
return f"{name}{vendor_part} [{fid}]" if fid else f"{name}{vendor_part}"
def _options_map(self) -> dict[str, tuple[str, str, str]]:
slot = self._slot()
material = str(slot.get("material") or "").upper()
options: dict[str, tuple[str, str, str]] = {self._AUTO_OPTION: ("", "", "")}
for profile in self._profile_catalog():
profile_type = str(profile.get("type") or "").upper()
if material and profile_type and profile_type != material:
continue
fid = str(profile.get("id") or "")
if not fid:
continue
vendor = str(profile.get("vendor") or "")
name = str(profile.get("name") or "")
options[self._profile_label(profile)] = (fid, vendor, name)
return options
@property
def available(self) -> bool:
return self._slot_index < self._slot_limit_for_mode() and bool(self._slot()) and super().available
@property
def options(self) -> list[str]:
return list(self._options_map().keys())
@property
def current_option(self) -> str:
slot = self._slot()
filament_id = str(slot.get("filament_id") or "")
vendor = str(slot.get("filament_vendor") or "")
name = str(slot.get("filament_name") or "")
if not filament_id:
return self._AUTO_OPTION
option_map = self._options_map()
for label, (opt_id, opt_vendor, opt_name) in option_map.items():
if name and vendor and opt_name == name and opt_vendor == vendor:
return label
if opt_id == filament_id and opt_vendor == vendor:
return label
if opt_id == filament_id and not vendor:
return label
return f"Custom [{filament_id}]"
def _apply_optimistic_state(self, filament_id: str, vendor: str, name: str) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
slots = merged.get("filament_slots")
if not isinstance(slots, list):
return
new_slots: list[dict[str, Any]] = []
for slot in slots:
if not isinstance(slot, dict):
continue
next_slot = dict(slot)
try:
if int(next_slot.get("slot_index", -1)) == self._slot_index:
next_slot["filament_id"] = filament_id
next_slot["filament_vendor"] = vendor
next_slot["filament_name"] = name
except (TypeError, ValueError):
pass
new_slots.append(next_slot)
merged["filament_slots"] = new_slots
self.coordinator.async_set_updated_data(merged)
async def async_select_option(self, option: str) -> None:
option_map = self._options_map()
if option not in option_map:
raise ServiceValidationError(f"Invalid option: {option}")
filament_id, vendor, name = option_map[option]
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_filament_slot_profile(self._slot_index, filament_id, vendor, name)
self._apply_optimistic_state(filament_id, vendor, name)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
api = hass.data[DOMAIN][entry.entry_id]["api"]
try:
hass.data[DOMAIN][entry.entry_id]["filament_profiles"] = await api.async_get_filament_profiles()
except KobraXApiError:
hass.data[DOMAIN][entry.entry_id]["filament_profiles"] = []
entities: list[SelectEntity] = [KobraXPrintSpeedSelect(coordinator, entry, "print_speed_mode", "Print Speed")]
entities.extend(KobraXFilamentProfileSelect(coordinator, entry, slot_index) for slot_index in range(MAX_FILAMENT_SLOTS))
async_add_entities(
[KobraXPrintSpeedSelect(coordinator, entry, "print_speed_mode", "Print Speed")]
entities
)

View File

@@ -462,6 +462,7 @@ async def async_setup_entry(hass, entry, async_add_entities):
ace_dryer_entities.append(KobraXAceDryerSensor(coordinator, entry, ace_id, sensor_type))
entity_registry = er.async_get(hass)
slot_registry_state: dict[str, int | None] = {"last_limit": None, "short_count": 0}
# Remove legacy single-unit ACE dryer sensors that were replaced by per-unit entities.
legacy_ace_sensor_keys = (
@@ -480,28 +481,50 @@ async def async_setup_entry(hass, entry, async_add_entities):
@callback
def _sync_slot_registry_state() -> None:
state_data = coordinator.data or {}
enabled_slots = _detected_slot_limit(state_data)
detected_limit = _detected_slot_limit(state_data)
last_limit = slot_registry_state["last_limit"]
short_count = int(slot_registry_state["short_count"] or 0)
if last_limit is None or detected_limit >= last_limit:
effective_limit = detected_limit
slot_registry_state["last_limit"] = detected_limit
slot_registry_state["short_count"] = 0
else:
short_count += 1
slot_registry_state["short_count"] = short_count
if short_count < 2:
effective_limit = last_limit
else:
effective_limit = detected_limit
slot_registry_state["last_limit"] = detected_limit
slot_registry_state["short_count"] = 0
for slot_index in range(MAX_FILAMENT_SLOTS):
unique_id = f"{entry.entry_id}_slot_{slot_index + 1}"
entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, unique_id)
if not entity_id:
continue
entities_to_sync = [
("sensor", f"{entry.entry_id}_slot_{slot_index + 1}"),
("select", f"{entry.entry_id}_slot_{slot_index + 1}_filament_profile"),
]
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
for platform, unique_id in entities_to_sync:
entity_id = entity_registry.async_get_entity_id(platform, DOMAIN, unique_id)
if not entity_id:
continue
should_enable = slot_index < enabled_slots
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
should_enable = slot_index < effective_limit
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
async_add_entities(
[

View File

@@ -1,7 +1,10 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.switch import SwitchEntity
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity import EntityCategory
from .api import KobraXApiError
from .const import DOMAIN
@@ -24,11 +27,87 @@ class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity):
value = auto_feed.get(str(self._ace_id))
return bool(value)
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
auto_feed = merged.get("ace_auto_feed")
if not isinstance(auto_feed, dict):
auto_feed = {}
else:
auto_feed = dict(auto_feed)
key: int | str = self._ace_id
if key not in auto_feed and str(self._ace_id) in auto_feed:
key = str(self._ace_id)
auto_feed[key] = 1 if is_on else 0
merged["ace_auto_feed"] = auto_feed
self.coordinator.async_set_updated_data(merged)
async def async_turn_on(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, True)
await self.coordinator.async_request_refresh()
self._apply_optimistic_state(True)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
class KobraXBridgeSettingSwitch(KobraXEntity, SwitchEntity):
def __init__(self, coordinator, entry, key: str, name: str, setting_key: str, icon: str) -> None:
super().__init__(coordinator, entry, key, name)
self._setting_key = setting_key
self._attr_icon = icon
self._attr_entity_category = EntityCategory.CONFIG
def _current_settings(self) -> dict[str, Any]:
settings = self.state_data.get("settings")
return settings if isinstance(settings, dict) else {}
@property
def available(self) -> bool:
return bool(self._current_settings()) and super().available
@property
def is_on(self) -> bool:
value = self._current_settings().get(self._setting_key)
try:
return bool(int(value))
except (TypeError, ValueError):
return bool(value)
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
settings = merged.get("settings")
if not isinstance(settings, dict):
settings = {}
else:
settings = dict(settings)
settings[self._setting_key] = 1 if is_on else 0
merged["settings"] = settings
self.coordinator.async_set_updated_data(merged)
async def _set_state(self, is_on: bool) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
settings = await api.async_get_settings()
settings[self._setting_key] = 1 if is_on else 0
await api.async_set_settings(settings)
self._apply_optimistic_state(is_on)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_turn_on(self, **kwargs) -> None:
await self._set_state(True)
async def async_turn_off(self, **kwargs) -> None:
await self._set_state(False)
async def async_turn_off(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, False)
self._apply_optimistic_state(False)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
@@ -62,6 +141,36 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
except (TypeError, ValueError):
return False
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
drying = merged.get("ace_drying")
if not isinstance(drying, dict):
drying = {}
else:
drying = dict(drying)
unit_data = drying.get(self._ace_id)
unit_key: int | str = self._ace_id
if unit_data is None:
unit_data = drying.get(str(self._ace_id))
if unit_data is not None:
unit_key = str(self._ace_id)
if isinstance(unit_data, dict):
next_unit_data = dict(unit_data)
else:
next_unit_data = {}
next_unit_data["status"] = 1 if is_on else 0
drying[unit_key] = next_unit_data
# Keep backward-compatible flat status for unit 0 payload variants.
if self._ace_id == 0:
drying["status"] = 1 if is_on else 0
merged["ace_drying"] = drying
self.coordinator.async_set_updated_data(merged)
async def async_turn_on(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
@@ -73,7 +182,7 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
duration=int(ace_cfg.get("duration", 240)),
ace_id=self._ace_id,
)
await self.coordinator.async_request_refresh()
self._apply_optimistic_state(True)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
@@ -81,7 +190,7 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_dry("stop", ace_id=self._ace_id)
await self.coordinator.async_request_refresh()
self._apply_optimistic_state(False)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
@@ -89,7 +198,24 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
entities = []
entities = [
KobraXBridgeSettingSwitch(
coordinator,
entry,
"camera_on_print",
"Camera On Print",
"camera_on_print",
"mdi:camera-wireless",
),
KobraXBridgeSettingSwitch(
coordinator,
entry,
"web_upload_warning",
"Web Upload Warning",
"web_upload_warning",
"mdi:alert-outline",
),
]
# Pre-create switches for all 4 possible ACE units
entities.extend(KobraXAceAutoFeedSwitch(coordinator, entry, ace_id) for ace_id in range(4))