mirror of
https://github.com/gangoke/kobrax-lan-hass-component.git
synced 2026-06-10 05:02:12 +02:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2f153ae26 | ||
|
|
cf3b4c1ca1 | ||
|
|
8370153dd2 | ||
|
|
cba7f9b707 | ||
|
|
4e5aeb54a8 | ||
|
|
ec1fbbc520 |
@@ -1,10 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KobraXApiError(Exception):
|
||||
"""Raised when communication with KX-Bridge fails."""
|
||||
|
||||
@@ -79,9 +83,8 @@ class KobraXApiClient:
|
||||
|
||||
async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]:
|
||||
data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on})
|
||||
result = data.get("result")
|
||||
if result is None:
|
||||
raise KobraXApiError("Unexpected response for /api/ace/auto_feed")
|
||||
if data.get("error") not in (None, ""):
|
||||
raise KobraXApiError(str(data["error"]))
|
||||
return data
|
||||
|
||||
async def async_set_ace_dry(
|
||||
@@ -99,10 +102,19 @@ class KobraXApiClient:
|
||||
if ace_id is not None:
|
||||
payload["ace_id"] = int(ace_id)
|
||||
|
||||
data = await self._post_json("/api/ace/dry", payload)
|
||||
result = data.get("result")
|
||||
if result is None:
|
||||
raise KobraXApiError("Unexpected response for /api/ace/dry")
|
||||
try:
|
||||
data = await self._post_json("/api/ace/dry", payload)
|
||||
except KobraXApiError as err:
|
||||
# Some bridge versions can return a false 502 while setDry is
|
||||
# still applied successfully on the printer.
|
||||
msg = str(err)
|
||||
if "502" in msg and "/api/ace/dry" in msg:
|
||||
_LOGGER.warning("Ignoring bridge 502 for /api/ace/dry because command may already be applied: %s", msg)
|
||||
return {"result": "ok", "warning": "ignored_502"}
|
||||
raise
|
||||
|
||||
if data.get("error") not in (None, ""):
|
||||
raise KobraXApiError(str(data["error"]))
|
||||
return data
|
||||
|
||||
async def async_pause_print(self) -> None:
|
||||
|
||||
@@ -51,15 +51,14 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
|
||||
action="disconnect",
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
entity_registry_enabled_default=False,
|
||||
),
|
||||
# ENABLE ME WHEN API ENDPOINT IS ADDED
|
||||
# KobraXButtonDescription(
|
||||
# key="restart_bridge",
|
||||
# name="Restart (KX-Bridge)",
|
||||
# icon="mdi:restart",
|
||||
# action="restart",
|
||||
# entity_category=EntityCategory.CONFIG,
|
||||
# ),
|
||||
),
|
||||
KobraXButtonDescription(
|
||||
key="restart_bridge",
|
||||
name="Restart (KX-Bridge)",
|
||||
icon="mdi:restart",
|
||||
action="restart",
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
),
|
||||
KobraXButtonDescription(
|
||||
key="refresh_skip_state",
|
||||
name="Refresh Skip State",
|
||||
|
||||
@@ -10,7 +10,7 @@ CONF_PRINTER_NAME = "printer_name"
|
||||
DEFAULT_HOST = "localhost:7125"
|
||||
DEFAULT_PRINTER_NAME = "Anycubic Kobra X"
|
||||
|
||||
UPDATE_INTERVAL = timedelta(seconds=5)
|
||||
UPDATE_INTERVAL = timedelta(seconds=3)
|
||||
UPDATE_CHECK_INTERVAL = timedelta(hours=1)
|
||||
|
||||
PLATFORMS = [
|
||||
|
||||
@@ -20,23 +20,10 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||
)
|
||||
self.api = api
|
||||
self._update_info: dict[str, Any] = {}
|
||||
self._restart_supported: bool | None = None
|
||||
self._next_update_check_monotonic = 0.0
|
||||
|
||||
async def _async_update_data(self) -> dict[str, Any]:
|
||||
try:
|
||||
# Probe restart endpoint once if not checked
|
||||
if self._restart_supported is None:
|
||||
try:
|
||||
await self.api.async_restart_bridge()
|
||||
self._restart_supported = True
|
||||
except Exception as err:
|
||||
# Only disable if 404/501, otherwise treat as available
|
||||
msg = str(err)
|
||||
if "404" in msg or "501" in msg:
|
||||
self._restart_supported = False
|
||||
else:
|
||||
self._restart_supported = True
|
||||
state = await self.api.async_get_state()
|
||||
|
||||
now = time.monotonic()
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.components.switch import SwitchEntity
|
||||
from homeassistant.exceptions import ServiceValidationError
|
||||
|
||||
@@ -24,11 +26,35 @@ class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity):
|
||||
value = auto_feed.get(str(self._ace_id))
|
||||
return bool(value)
|
||||
|
||||
def _apply_optimistic_state(self, is_on: bool) -> None:
|
||||
merged: dict[str, Any] = dict(self.coordinator.data or {})
|
||||
auto_feed = merged.get("ace_auto_feed")
|
||||
if not isinstance(auto_feed, dict):
|
||||
auto_feed = {}
|
||||
else:
|
||||
auto_feed = dict(auto_feed)
|
||||
|
||||
key: int | str = self._ace_id
|
||||
if key not in auto_feed and str(self._ace_id) in auto_feed:
|
||||
key = str(self._ace_id)
|
||||
auto_feed[key] = 1 if is_on else 0
|
||||
|
||||
merged["ace_auto_feed"] = auto_feed
|
||||
self.coordinator.async_set_updated_data(merged)
|
||||
|
||||
async def async_turn_on(self, **kwargs) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
await api.async_set_ace_auto_feed(self._ace_id, True)
|
||||
await self.coordinator.async_request_refresh()
|
||||
self._apply_optimistic_state(True)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
async def async_turn_off(self, **kwargs) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
await api.async_set_ace_auto_feed(self._ace_id, False)
|
||||
self._apply_optimistic_state(False)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
@@ -62,6 +88,36 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
def _apply_optimistic_state(self, is_on: bool) -> None:
|
||||
merged: dict[str, Any] = dict(self.coordinator.data or {})
|
||||
drying = merged.get("ace_drying")
|
||||
if not isinstance(drying, dict):
|
||||
drying = {}
|
||||
else:
|
||||
drying = dict(drying)
|
||||
|
||||
unit_data = drying.get(self._ace_id)
|
||||
unit_key: int | str = self._ace_id
|
||||
if unit_data is None:
|
||||
unit_data = drying.get(str(self._ace_id))
|
||||
if unit_data is not None:
|
||||
unit_key = str(self._ace_id)
|
||||
|
||||
if isinstance(unit_data, dict):
|
||||
next_unit_data = dict(unit_data)
|
||||
else:
|
||||
next_unit_data = {}
|
||||
|
||||
next_unit_data["status"] = 1 if is_on else 0
|
||||
drying[unit_key] = next_unit_data
|
||||
|
||||
# Keep backward-compatible flat status for unit 0 payload variants.
|
||||
if self._ace_id == 0:
|
||||
drying["status"] = 1 if is_on else 0
|
||||
|
||||
merged["ace_drying"] = drying
|
||||
self.coordinator.async_set_updated_data(merged)
|
||||
|
||||
async def async_turn_on(self, **kwargs) -> None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
@@ -73,7 +129,7 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
|
||||
duration=int(ace_cfg.get("duration", 240)),
|
||||
ace_id=self._ace_id,
|
||||
)
|
||||
await self.coordinator.async_request_refresh()
|
||||
self._apply_optimistic_state(True)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
@@ -81,7 +137,7 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
await api.async_set_ace_dry("stop", ace_id=self._ace_id)
|
||||
await self.coordinator.async_request_refresh()
|
||||
self._apply_optimistic_state(False)
|
||||
except KobraXApiError as err:
|
||||
raise ServiceValidationError(str(err)) from err
|
||||
|
||||
|
||||
Reference in New Issue
Block a user