6 Commits

5 changed files with 87 additions and 33 deletions

View File

@@ -1,10 +1,14 @@
from __future__ import annotations
import logging
from typing import Any
import aiohttp
_LOGGER = logging.getLogger(__name__)
class KobraXApiError(Exception):
"""Raised when communication with KX-Bridge fails."""
@@ -79,9 +83,8 @@ class KobraXApiClient:
async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]:
data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on})
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /api/ace/auto_feed")
if data.get("error") not in (None, ""):
raise KobraXApiError(str(data["error"]))
return data
async def async_set_ace_dry(
@@ -99,10 +102,19 @@ class KobraXApiClient:
if ace_id is not None:
payload["ace_id"] = int(ace_id)
data = await self._post_json("/api/ace/dry", payload)
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /api/ace/dry")
try:
data = await self._post_json("/api/ace/dry", payload)
except KobraXApiError as err:
# Some bridge versions can return a false 502 while setDry is
# still applied successfully on the printer.
msg = str(err)
if "502" in msg and "/api/ace/dry" in msg:
_LOGGER.warning("Ignoring bridge 502 for /api/ace/dry because command may already be applied: %s", msg)
return {"result": "ok", "warning": "ignored_502"}
raise
if data.get("error") not in (None, ""):
raise KobraXApiError(str(data["error"]))
return data
async def async_pause_print(self) -> None:

View File

@@ -51,15 +51,14 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
action="disconnect",
entity_category=EntityCategory.CONFIG,
entity_registry_enabled_default=False,
),
# ENABLE ME WHEN API ENDPOINT IS ADDED
# KobraXButtonDescription(
# key="restart_bridge",
# name="Restart (KX-Bridge)",
# icon="mdi:restart",
# action="restart",
# entity_category=EntityCategory.CONFIG,
# ),
),
KobraXButtonDescription(
key="restart_bridge",
name="Restart (KX-Bridge)",
icon="mdi:restart",
action="restart",
entity_category=EntityCategory.CONFIG,
),
KobraXButtonDescription(
key="refresh_skip_state",
name="Refresh Skip State",

View File

@@ -10,7 +10,7 @@ CONF_PRINTER_NAME = "printer_name"
DEFAULT_HOST = "localhost:7125"
DEFAULT_PRINTER_NAME = "Anycubic Kobra X"
UPDATE_INTERVAL = timedelta(seconds=5)
UPDATE_INTERVAL = timedelta(seconds=3)
UPDATE_CHECK_INTERVAL = timedelta(hours=1)
PLATFORMS = [

View File

@@ -20,23 +20,10 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
)
self.api = api
self._update_info: dict[str, Any] = {}
self._restart_supported: bool | None = None
self._next_update_check_monotonic = 0.0
async def _async_update_data(self) -> dict[str, Any]:
try:
# Probe restart endpoint once if not checked
if self._restart_supported is None:
try:
await self.api.async_restart_bridge()
self._restart_supported = True
except Exception as err:
# Only disable if 404/501, otherwise treat as available
msg = str(err)
if "404" in msg or "501" in msg:
self._restart_supported = False
else:
self._restart_supported = True
state = await self.api.async_get_state()
now = time.monotonic()

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.switch import SwitchEntity
from homeassistant.exceptions import ServiceValidationError
@@ -24,11 +26,35 @@ class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity):
value = auto_feed.get(str(self._ace_id))
return bool(value)
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
auto_feed = merged.get("ace_auto_feed")
if not isinstance(auto_feed, dict):
auto_feed = {}
else:
auto_feed = dict(auto_feed)
key: int | str = self._ace_id
if key not in auto_feed and str(self._ace_id) in auto_feed:
key = str(self._ace_id)
auto_feed[key] = 1 if is_on else 0
merged["ace_auto_feed"] = auto_feed
self.coordinator.async_set_updated_data(merged)
async def async_turn_on(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, True)
await self.coordinator.async_request_refresh()
self._apply_optimistic_state(True)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_turn_off(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, False)
self._apply_optimistic_state(False)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
@@ -62,6 +88,36 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
except (TypeError, ValueError):
return False
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
drying = merged.get("ace_drying")
if not isinstance(drying, dict):
drying = {}
else:
drying = dict(drying)
unit_data = drying.get(self._ace_id)
unit_key: int | str = self._ace_id
if unit_data is None:
unit_data = drying.get(str(self._ace_id))
if unit_data is not None:
unit_key = str(self._ace_id)
if isinstance(unit_data, dict):
next_unit_data = dict(unit_data)
else:
next_unit_data = {}
next_unit_data["status"] = 1 if is_on else 0
drying[unit_key] = next_unit_data
# Keep backward-compatible flat status for unit 0 payload variants.
if self._ace_id == 0:
drying["status"] = 1 if is_on else 0
merged["ace_drying"] = drying
self.coordinator.async_set_updated_data(merged)
async def async_turn_on(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
@@ -73,7 +129,7 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
duration=int(ace_cfg.get("duration", 240)),
ace_id=self._ace_id,
)
await self.coordinator.async_request_refresh()
self._apply_optimistic_state(True)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
@@ -81,7 +137,7 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_dry("stop", ace_id=self._ace_id)
await self.coordinator.async_request_refresh()
self._apply_optimistic_state(False)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err