update for bridge v0.9.18

This commit is contained in:
Gangoke
2026-05-31 18:51:35 -10:00
parent 37549de681
commit 36d64b876e
8 changed files with 358 additions and 28 deletions

View File

@@ -43,11 +43,11 @@ The config flow asks for:
| Platform | Key Entities |
| --- | --- |
| Binary Sensor | Online, Printing, Light State |
| Sensor | State, Print State, Progress, Hotend Temperature, Target Hotend Temperature, Bed Temperature, Target Bed Temperature, Filename, Current Layer, Total Layers, Remaining Time, Print Duration, Skip Object Count, Skipped Object Count, Filament Mode, ACE Unit Count, Bridge Version, Latest Available Version, Slot 1..Slot 19, ACE 1..4 Dryer Status, ACE 1..4 Dryer Humidity, ACE 1..4 Dryer Current Temperature, ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Remaining Time |
| Sensor | State, Print State, Progress, Hotend Temperature, Target Hotend Temperature, Bed Temperature, Target Bed Temperature, Filename, Current Layer, Total Layers, Remaining Time, Print Duration, Skip Object Count, Skipped Object Count, Filament Mode, ACE Unit Count, Bridge Version, Latest Available Version, Camera Stream Mode, Slot 1..Slot 19, ACE 1..4 Dryer Status, ACE 1..4 Dryer Humidity, ACE 1..4 Dryer Current Temperature, ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Remaining Time |
| Button | Pause Print, Resume Print, Cancel Print, Connect Bridge, Disconnect Bridge, Refresh Skip State, Apply Update (KX-Bridge) |
| Switch | ACE 1..4 Auto Fill, ACE 1..4 Dryer |
| Switch | Camera On Print, Web Upload Warning, ACE 1..4 Auto Fill, ACE 1..4 Dryer |
| Number | ACE 1..4 Dryer Target Temperature, ACE 1..4 Dryer Duration |
| Select | Print speed |
| Select | Print speed, Slot 1..Slot 19 Filament Profile |
| Light | Printer light |
| Camera | Printer camera |
| Image | G-code thumbnail |

View File

@@ -27,6 +27,18 @@ class KobraXApiClient:
def camera_h264_proxy_url(self) -> str:
return self._url("/api/camera/h264")
async def async_h264_stream_available(self, timeout_seconds: float = 1.5) -> bool:
"""Probe whether the bridge h264 endpoint is reachable now."""
try:
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
async with self._session.get(self.camera_h264_proxy_url(), timeout=timeout) as response:
if response.status != 200:
return False
chunk = await response.content.read(1)
return bool(chunk)
except Exception:
return False
async def _get_json(self, path: str) -> dict[str, Any]:
try:
async with self._session.get(self._url(path)) as response:
@@ -49,6 +61,18 @@ class KobraXApiClient:
async def async_get_state(self) -> dict[str, Any]:
return await self._get_json("/api/state")
async def async_get_settings(self) -> dict[str, Any]:
data = await self._get_json("/api/settings")
if isinstance(data, dict):
return data
raise KobraXApiError("Unexpected response for /api/settings")
async def async_set_settings(self, payload: dict[str, Any]) -> dict[str, Any]:
data = await self._post_json("/api/settings", payload)
if isinstance(data, dict):
return data
raise KobraXApiError("Unexpected response for /api/settings")
async def async_get_files(self) -> list[dict[str, Any]]:
data = await self._get_json("/kx/files")
result = data.get("result", [])
@@ -84,6 +108,45 @@ class KobraXApiClient:
return result
raise KobraXApiError("Unexpected response for /kx/skip/state")
async def async_get_filament_slots(self) -> list[dict[str, Any]]:
data = await self._get_json("/kx/filament/slots")
result = data.get("result", [])
if isinstance(result, list):
return [slot for slot in result if isinstance(slot, dict)]
raise KobraXApiError("Unexpected response for /kx/filament/slots")
async def async_get_filament_profiles(self, material_type: str | None = None) -> list[dict[str, Any]]:
params: dict[str, str] = {}
if material_type:
params["type"] = material_type
try:
async with self._session.get(self._url("/kx/filament/profiles"), params=params or None) as response:
response.raise_for_status()
data = await response.json()
except Exception as err:
raise KobraXApiError(err) from err
result = data.get("result", []) if isinstance(data, dict) else []
if isinstance(result, list):
return [profile for profile in result if isinstance(profile, dict)]
raise KobraXApiError("Unexpected response for /kx/filament/profiles")
async def async_set_filament_slot_profile(
self,
slot_index: int,
filament_id: str,
vendor: str = "",
name: str = "",
) -> dict[str, Any]:
data = await self._post_json(
f"/kx/filament/slots/{int(slot_index)}/profile",
{"id": filament_id, "vendor": vendor, "name": name},
)
if isinstance(data, dict):
return data
raise KobraXApiError("Unexpected response for /kx/filament/slots/{idx}/profile")
async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]:
data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on})
if data.get("error") not in (None, ""):

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import re
from homeassistant.components.camera import Camera, CameraEntityFeature
from .api import KobraXApiError
@@ -51,9 +50,13 @@ class KobraXCamera(KobraXEntity, Camera):
except KobraXApiError:
pass
selected_mode = "mjpeg_proxy"
selected_url = api.camera_stream_proxy_url()
version = self.state_data.get("version")
if isinstance(version, str) and self._bridge_supports_h264_stream(version):
return api.camera_h264_proxy_url()
if await api.async_h264_stream_available():
return api.camera_h264_proxy_url()
camera_url = self.state_data.get("camera_url")
if isinstance(camera_url, str) and camera_url:
@@ -62,9 +65,12 @@ class KobraXCamera(KobraXEntity, Camera):
try:
camera_url = await api.async_get_camera_url()
except KobraXApiError:
return api.camera_stream_proxy_url()
return selected_url
return camera_url or api.camera_stream_proxy_url()
if isinstance(camera_url, str) and camera_url:
selected_url = camera_url
return selected_url
async def async_camera_image(self, width: int | None = None, height: int | None = None) -> bytes | None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]

View File

@@ -37,12 +37,28 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
if self._update_info:
state["update_info"] = self._update_info
try:
settings = await self.api.async_get_settings()
state["settings"] = settings
except KobraXApiError:
# Settings endpoint is only available on newer bridge versions.
pass
try:
skip_state = await self.api.async_get_skip_state()
state["skip_state"] = skip_state
except KobraXApiError:
# Skip endpoints are only available on newer bridge versions.
pass
try:
filament_slots = await self.api.async_get_filament_slots()
state["filament_slots"] = filament_slots
except KobraXApiError:
# Filament profile endpoints are only available on newer bridge versions.
pass
return state
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err

View File

@@ -9,5 +9,5 @@
"documentation": "https://github.com/gangoke/kobrax-lan-hass-component",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/gangoke/kobrax-lan-hass-component/issues",
"version": "0.2.0"
"version": "0.3.0"
}

View File

@@ -1,13 +1,34 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.select import SelectEntity
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity import EntityCategory
from .api import KobraXApiError
from .const import DOMAIN
from .entity import KobraXEntity
MAX_FILAMENT_SLOTS = 19
TOOLHEAD_SLOT_LIMIT = 4
ACE_DIRECT_SLOT_LIMIT = 4
def _detected_slot_limit(state_data: dict[str, Any]) -> int:
mode = str(state_data.get("filament_mode") or "toolhead").lower()
if mode == "toolhead":
return TOOLHEAD_SLOT_LIMIT
if mode == "ace_direct":
return ACE_DIRECT_SLOT_LIMIT
slots = state_data.get("ams_slots") or []
if isinstance(slots, list) and slots:
return min(len(slots), MAX_FILAMENT_SLOTS)
return MAX_FILAMENT_SLOTS
class KobraXPrintSpeedSelect(KobraXEntity, SelectEntity):
_attr_options = ["Slow (1)", "Normal (2)", "Fast (3)"]
@@ -34,8 +55,139 @@ class KobraXPrintSpeedSelect(KobraXEntity, SelectEntity):
raise ServiceValidationError(str(err)) from err
class KobraXFilamentProfileSelect(KobraXEntity, SelectEntity):
_AUTO_OPTION = "Auto (No Override)"
def __init__(self, coordinator, entry, slot_index: int) -> None:
super().__init__(coordinator, entry, f"slot_{slot_index + 1}_filament_profile", f"Slot {slot_index + 1} Filament Profile")
self._slot_index = slot_index
self._attr_icon = "mdi:palette"
self._attr_entity_category = EntityCategory.CONFIG
def _slot_limit_for_mode(self) -> int:
return _detected_slot_limit(self.state_data)
def _slot(self) -> dict[str, Any]:
slots = self.state_data.get("filament_slots") or []
if not isinstance(slots, list):
return {}
for slot in slots:
if not isinstance(slot, dict):
continue
try:
if int(slot.get("slot_index", -1)) == self._slot_index:
return slot
except (TypeError, ValueError):
continue
return {}
def _profile_catalog(self) -> list[dict[str, Any]]:
data = self.hass.data[DOMAIN][self._entry.entry_id]
profiles = data.get("filament_profiles")
if isinstance(profiles, list):
return [item for item in profiles if isinstance(item, dict)]
return []
@staticmethod
def _profile_label(profile: dict[str, Any]) -> str:
name = str(profile.get("name") or profile.get("id") or "Unknown")
vendor = str(profile.get("vendor") or "")
fid = str(profile.get("id") or "")
vendor_part = f" - {vendor}" if vendor else ""
return f"{name}{vendor_part} [{fid}]" if fid else f"{name}{vendor_part}"
def _options_map(self) -> dict[str, tuple[str, str, str]]:
slot = self._slot()
material = str(slot.get("material") or "").upper()
options: dict[str, tuple[str, str, str]] = {self._AUTO_OPTION: ("", "", "")}
for profile in self._profile_catalog():
profile_type = str(profile.get("type") or "").upper()
if material and profile_type and profile_type != material:
continue
fid = str(profile.get("id") or "")
if not fid:
continue
vendor = str(profile.get("vendor") or "")
name = str(profile.get("name") or "")
options[self._profile_label(profile)] = (fid, vendor, name)
return options
@property
def available(self) -> bool:
return self._slot_index < self._slot_limit_for_mode() and bool(self._slot()) and super().available
@property
def options(self) -> list[str]:
return list(self._options_map().keys())
@property
def current_option(self) -> str:
slot = self._slot()
filament_id = str(slot.get("filament_id") or "")
vendor = str(slot.get("filament_vendor") or "")
name = str(slot.get("filament_name") or "")
if not filament_id:
return self._AUTO_OPTION
option_map = self._options_map()
for label, (opt_id, opt_vendor, opt_name) in option_map.items():
if name and vendor and opt_name == name and opt_vendor == vendor:
return label
if opt_id == filament_id and opt_vendor == vendor:
return label
if opt_id == filament_id and not vendor:
return label
return f"Custom [{filament_id}]"
def _apply_optimistic_state(self, filament_id: str, vendor: str, name: str) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
slots = merged.get("filament_slots")
if not isinstance(slots, list):
return
new_slots: list[dict[str, Any]] = []
for slot in slots:
if not isinstance(slot, dict):
continue
next_slot = dict(slot)
try:
if int(next_slot.get("slot_index", -1)) == self._slot_index:
next_slot["filament_id"] = filament_id
next_slot["filament_vendor"] = vendor
next_slot["filament_name"] = name
except (TypeError, ValueError):
pass
new_slots.append(next_slot)
merged["filament_slots"] = new_slots
self.coordinator.async_set_updated_data(merged)
async def async_select_option(self, option: str) -> None:
option_map = self._options_map()
if option not in option_map:
raise ServiceValidationError(f"Invalid option: {option}")
filament_id, vendor, name = option_map[option]
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_filament_slot_profile(self._slot_index, filament_id, vendor, name)
self._apply_optimistic_state(filament_id, vendor, name)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
api = hass.data[DOMAIN][entry.entry_id]["api"]
try:
hass.data[DOMAIN][entry.entry_id]["filament_profiles"] = await api.async_get_filament_profiles()
except KobraXApiError:
hass.data[DOMAIN][entry.entry_id]["filament_profiles"] = []
entities: list[SelectEntity] = [KobraXPrintSpeedSelect(coordinator, entry, "print_speed_mode", "Print Speed")]
entities.extend(KobraXFilamentProfileSelect(coordinator, entry, slot_index) for slot_index in range(MAX_FILAMENT_SLOTS))
async_add_entities(
[KobraXPrintSpeedSelect(coordinator, entry, "print_speed_mode", "Print Speed")]
entities
)

View File

@@ -462,6 +462,7 @@ async def async_setup_entry(hass, entry, async_add_entities):
ace_dryer_entities.append(KobraXAceDryerSensor(coordinator, entry, ace_id, sensor_type))
entity_registry = er.async_get(hass)
slot_registry_state: dict[str, int | None] = {"last_limit": None, "short_count": 0}
# Remove legacy single-unit ACE dryer sensors that were replaced by per-unit entities.
legacy_ace_sensor_keys = (
@@ -480,28 +481,50 @@ async def async_setup_entry(hass, entry, async_add_entities):
@callback
def _sync_slot_registry_state() -> None:
state_data = coordinator.data or {}
enabled_slots = _detected_slot_limit(state_data)
detected_limit = _detected_slot_limit(state_data)
last_limit = slot_registry_state["last_limit"]
short_count = int(slot_registry_state["short_count"] or 0)
if last_limit is None or detected_limit >= last_limit:
effective_limit = detected_limit
slot_registry_state["last_limit"] = detected_limit
slot_registry_state["short_count"] = 0
else:
short_count += 1
slot_registry_state["short_count"] = short_count
if short_count < 2:
effective_limit = last_limit
else:
effective_limit = detected_limit
slot_registry_state["last_limit"] = detected_limit
slot_registry_state["short_count"] = 0
for slot_index in range(MAX_FILAMENT_SLOTS):
unique_id = f"{entry.entry_id}_slot_{slot_index + 1}"
entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, unique_id)
if not entity_id:
continue
entities_to_sync = [
("sensor", f"{entry.entry_id}_slot_{slot_index + 1}"),
("select", f"{entry.entry_id}_slot_{slot_index + 1}_filament_profile"),
]
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
for platform, unique_id in entities_to_sync:
entity_id = entity_registry.async_get_entity_id(platform, DOMAIN, unique_id)
if not entity_id:
continue
should_enable = slot_index < enabled_slots
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
should_enable = slot_index < effective_limit
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
async_add_entities(
[

View File

@@ -4,6 +4,7 @@ from typing import Any
from homeassistant.components.switch import SwitchEntity
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity import EntityCategory
from .api import KobraXApiError
from .const import DOMAIN
@@ -50,6 +51,58 @@ class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity):
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
class KobraXBridgeSettingSwitch(KobraXEntity, SwitchEntity):
def __init__(self, coordinator, entry, key: str, name: str, setting_key: str, icon: str) -> None:
super().__init__(coordinator, entry, key, name)
self._setting_key = setting_key
self._attr_icon = icon
self._attr_entity_category = EntityCategory.CONFIG
def _current_settings(self) -> dict[str, Any]:
settings = self.state_data.get("settings")
return settings if isinstance(settings, dict) else {}
@property
def available(self) -> bool:
return bool(self._current_settings()) and super().available
@property
def is_on(self) -> bool:
value = self._current_settings().get(self._setting_key)
try:
return bool(int(value))
except (TypeError, ValueError):
return bool(value)
def _apply_optimistic_state(self, is_on: bool) -> None:
merged: dict[str, Any] = dict(self.coordinator.data or {})
settings = merged.get("settings")
if not isinstance(settings, dict):
settings = {}
else:
settings = dict(settings)
settings[self._setting_key] = 1 if is_on else 0
merged["settings"] = settings
self.coordinator.async_set_updated_data(merged)
async def _set_state(self, is_on: bool) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
settings = await api.async_get_settings()
settings[self._setting_key] = 1 if is_on else 0
await api.async_set_settings(settings)
self._apply_optimistic_state(is_on)
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_turn_on(self, **kwargs) -> None:
await self._set_state(True)
async def async_turn_off(self, **kwargs) -> None:
await self._set_state(False)
async def async_turn_off(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
@@ -145,7 +198,24 @@ class KobraXAceDryerSwitch(KobraXEntity, SwitchEntity):
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
entities = []
entities = [
KobraXBridgeSettingSwitch(
coordinator,
entry,
"camera_on_print",
"Camera On Print",
"camera_on_print",
"mdi:camera-wireless",
),
KobraXBridgeSettingSwitch(
coordinator,
entry,
"web_upload_warning",
"Web Upload Warning",
"web_upload_warning",
"mdi:alert-outline",
),
]
# Pre-create switches for all 4 possible ACE units
entities.extend(KobraXAceAutoFeedSwitch(coordinator, entry, ace_id) for ace_id in range(4))