ACE2 Support + more

This commit is contained in:
Gangoke
2026-05-21 00:31:36 -10:00
parent 96ccfb98ff
commit 13ce4d48f3
16 changed files with 869 additions and 116 deletions

120
README.md
View File

@@ -1,101 +1,61 @@
# Kobra X Home Assistant Component
# Kobra X LAN for Home Assistant
Home Assistant HACS integration for controlling and monitoring an Anycubic Kobra X through KX-Bridge.
Home Assistant integration for monitoring and controlling an Anycubic Kobra X through KX-Bridge.
This project was coded with AI assistance and should be reviewed before use in production.
Architecture:
- printer <-> [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release) <-> this integration <-> Home Assistant
- printer <-> [KX-Bridge](https://gitea.it-drui.de/viewit/KX-Bridge-Release) <-> this integration <-> Home Assistant
## Features
## Requirements
- Auto-discovered status from KX-Bridge `/api/state`
- Core printer sensors (state, temperatures, progress, file, layer/time data)
- Light control
- Print speed mode selection
- Printer action buttons (pause, resume, cancel, connect, disconnect)
- Camera stream entity using the printer RTSP URL from KX-Bridge, with bridge MJPEG proxy fallback
- Camera snapshot fallback using `/api/camera/snapshot`
- G-code thumbnail image entity from the active print job
- Running and reachable [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release)
- Bridge endpoint accessible from Home Assistant at `http://<bridge-host>:7125`
## Available Entities
## Installation
### Binary Sensors
### Option 1: HACS
- `Online`
- `Printing`
- `Light State`
### Sensors
- `State`
- `Print State`
- `Progress`
- `Hotend Temperature`
- `Target Hotend Temperature`
- `Bed Temperature`
- `Target Bed Temperature`
- `Filename`
- `Current Layer`
- `Total Layers`
- `Remaining Time`
- `Print Duration`
- `Filament Slot 1 Color` / `Filament Slot 1 Type`
- `Filament Slot 2 Color` / `Filament Slot 2 Type`
- `Filament Slot 3 Color` / `Filament Slot 3 Type`
- `Filament Slot 4 Color` / `Filament Slot 4 Type`
The filament slot entities are created from the AMS slot data reported by KX-Bridge. If the bridge does not report a slot count, the integration falls back to 4 slots.
### Buttons
- `Pause Print`
- `Resume Print`
- `Cancel Print`
- `Connect Bridge`
- `Disconnect Bridge`
### Select
- `Print Speed`
### Light
- `Light`
### Camera
- `Camera`
### Image
- `GCode Thumbnail`
## Prerequisites
1. [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release) must be running and reachable from Home Assistant.
2. Verify [KX-Bridge-Release](https://gitea.it-drui.de/viewit/KX-Bridge-Release) is accessible at `http://<bridge-host>:7125`.
## Installation (HACS)
1. Add this repository as a custom repository in HACS with category `Integration`.
```https://github.com/gangoke/kobrax-lan-hass-component```
2. Install the integration.
1. In HACS, add this repository as a custom repository (category: Integration):
`https://github.com/gangoke/kobrax-lan-hass-component`
2. Install Kobra X LAN from HACS.
3. Restart Home Assistant.
4. Add integration `Kobra X LAN` from Settings -> Devices & Services.
4. Go to Settings -> Devices & Services -> Add Integration.
5. Search for Kobra X LAN.
### Option 2: Manual (local custom_components)
1. Copy the `kobrax_lan` folder into your Home Assistant `custom_components` directory:
`<config>/custom_components/kobrax_lan`
3. Restart Home Assistant.
4. Add Kobra X LAN from Settings -> Devices & Services.
## Configuration
The config flow asks for:
- Host: KX-Bridge host and port (example: `192.168.1.50:7125`)
- Printer name: Friendly display name
- Printer name: Friendly display name in Home Assistant
## Entity Overview
| Platform | Key Entities |
| --- | --- |
| Binary Sensor | Online, Printing, Light State |
| Sensor | State, Print State, Progress, Temperatures, Filename, Layer/Time metrics, Skip-object counts, ACE status, AMS/ACE slot material |
| Button | Pause Print, Resume Print, Cancel Print, Connect Bridge, Disconnect Bridge, Refresh Skip State, ACE Dryer Start, ACE Dryer Stop |
| Switch | ACE auto-fill switches per detected ACE unit |
| Number | ACE dry target temperature, ACE dry duration |
| Select | Print speed |
| Light | Printer light |
| Camera | Printer camera |
| Image | G-code thumbnail |
Slot entities are generated dynamically from KX-Bridge AMS/ACE slot data (including larger ACE topologies).
## Notes
- This integration talks to KX-Bridge HTTP endpoints and does not connect directly to the printer.
- Keep KX-Bridge and Home Assistant on the same trusted network.
- Native WebRTC is not implemented in this integration. If you want WebRTC in Home Assistant, point `go2rtc` or a WebRTC-capable HA add-on at the camera entity's RTSP source.
- This integration communicates with KX-Bridge HTTP endpoints and does not connect directly to the printer.
- Keep KX-Bridge and Home Assistant on a trusted local network.
- Native WebRTC is not implemented. For WebRTC in Home Assistant, point `go2rtc` (or another WebRTC-capable add-on) to the camera RTSP source.

View File

@@ -34,6 +34,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"coordinator": coordinator,
"api": api,
"entry": entry,
"ace_dry_config": {
0: {
"target_temp": int((coordinator.data or {}).get("ace_drying", {}).get("target_temp", 45) or 45),
"duration": int((coordinator.data or {}).get("ace_drying", {}).get("duration", 240) or 240),
},
1: {
"target_temp": 45,
"duration": 240,
},
2: {
"target_temp": 45,
"duration": 240,
},
3: {
"target_temp": 45,
"duration": 240,
},
},
}
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

View File

@@ -49,6 +49,62 @@ class KobraXApiClient:
return result
raise KobraXApiError("Unexpected response for /kx/files")
async def async_get_file_objects(self, file_id: str) -> dict[str, Any]:
data = await self._get_json(f"/kx/files/{file_id}/objects")
result = data.get("result", {})
if isinstance(result, dict):
return result
raise KobraXApiError("Unexpected response for /kx/files/{id}/objects")
async def async_skip_objects(self, names: list[str]) -> dict[str, Any]:
data = await self._post_json("/kx/skip", {"names": names})
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /kx/skip")
return data
async def async_skip_query(self) -> dict[str, Any]:
data = await self._post_json("/kx/skip/query", {})
result = data.get("result", {})
if isinstance(result, dict):
return result
raise KobraXApiError("Unexpected response for /kx/skip/query")
async def async_get_skip_state(self) -> dict[str, Any]:
data = await self._get_json("/kx/skip/state")
result = data.get("result", {})
if isinstance(result, dict):
return result
raise KobraXApiError("Unexpected response for /kx/skip/state")
async def async_set_ace_auto_feed(self, ace_id: int, on: bool) -> dict[str, Any]:
data = await self._post_json("/api/ace/auto_feed", {"ace_id": ace_id, "on": on})
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /api/ace/auto_feed")
return data
async def async_set_ace_dry(
self,
action: str,
target_temp: int | None = None,
duration: int | None = None,
ace_id: int | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"action": action}
if target_temp is not None:
payload["target_temp"] = int(target_temp)
if duration is not None:
payload["duration"] = int(duration)
if ace_id is not None:
payload["ace_id"] = int(ace_id)
data = await self._post_json("/api/ace/dry", payload)
result = data.get("result")
if result is None:
raise KobraXApiError("Unexpected response for /api/ace/dry")
return data
async def async_pause_print(self) -> None:
await self._post_json("/printer/print/pause", {})
@@ -79,9 +135,24 @@ class KobraXApiClient:
async def async_disconnect(self) -> None:
await self._post_json("/api/disconnect", {})
async def async_restart_bridge(self) -> None:
await self._post_json("/api/restart", {})
async def async_start_camera(self) -> None:
await self._post_json("/api/camera/start", {})
async def async_stop_camera(self) -> None:
await self._post_json("/api/camera/stop", {})
async def async_check_updates(self) -> dict[str, Any]:
return await self._get_json("/api/update/check")
async def async_apply_update(self, tag: str, download_url: str) -> dict[str, Any]:
return await self._post_json(
"/api/update/apply",
{"tag": tag, "download_url": download_url},
)
async def async_get_camera_url(self) -> str | None:
data = await self._get_json("/api/camera")
url = data.get("url")

View File

@@ -7,6 +7,7 @@ from homeassistant.components.binary_sensor import (
BinarySensorEntity,
BinarySensorEntityDescription,
)
from homeassistant.helpers.entity import EntityCategory
from .const import DOMAIN
from .entity import KobraXEntity
@@ -23,6 +24,7 @@ BINARY_SENSORS: tuple[KobraXBinaryDescription, ...] = (
name="Online",
value_key="kobra_state",
device_class=BinarySensorDeviceClass.CONNECTIVITY,
entity_category=EntityCategory.DIAGNOSTIC,
),
KobraXBinaryDescription(
key="printing",

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

View File

@@ -4,6 +4,7 @@ from dataclasses import dataclass
from homeassistant.components.button import ButtonEntity, ButtonEntityDescription
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity import EntityCategory
from .api import KobraXApiError
from .const import DOMAIN
@@ -13,6 +14,7 @@ from .entity import KobraXEntity
@dataclass(frozen=True, kw_only=True)
class KobraXButtonDescription(ButtonEntityDescription):
action: str
ace_id: int | None = None # None for non-ACE buttons, 0-3 for ACE buttons
BUTTONS: tuple[KobraXButtonDescription, ...] = (
@@ -39,6 +41,7 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
name="Connect Bridge",
icon="mdi:lan-connect",
action="connect",
entity_category=EntityCategory.CONFIG,
entity_registry_enabled_default=False,
),
KobraXButtonDescription(
@@ -46,11 +49,37 @@ BUTTONS: tuple[KobraXButtonDescription, ...] = (
name="Disconnect Bridge",
icon="mdi:lan-disconnect",
action="disconnect",
entity_category=EntityCategory.CONFIG,
entity_registry_enabled_default=False,
),
# ENABLE ME WHEN API ENDPOINT IS ADDED
# KobraXButtonDescription(
# key="restart_bridge",
# name="Restart (KX-Bridge)",
# icon="mdi:restart",
# action="restart",
# entity_category=EntityCategory.CONFIG,
# ),
KobraXButtonDescription(
key="refresh_skip_state",
name="Refresh Skip State",
icon="mdi:refresh",
action="skip_query",
entity_registry_enabled_default=False,
),
KobraXButtonDescription(
key="apply_update",
name="Apply Update (KX-Bridge)",
icon="mdi:download-circle-outline",
action="apply_update",
entity_category=EntityCategory.CONFIG,
),
)
# ACE dryer buttons are now dynamically created in async_setup_entry
class KobraXActionButton(KobraXEntity, ButtonEntity):
entity_description: KobraXButtonDescription
@@ -58,6 +87,25 @@ class KobraXActionButton(KobraXEntity, ButtonEntity):
super().__init__(coordinator, entry, description.key, description.name)
self.entity_description = description
@property
def available(self) -> bool:
if self.entity_description.action != "apply_update":
return super().available
update_info = self.state_data.get("update_info")
if not isinstance(update_info, dict):
return False
current = str(update_info.get("current") or "").strip()
latest = str(update_info.get("latest") or "").strip()
if current and latest and current == latest:
return False
if update_info.get("update_available") is False:
return False
return super().available
async def async_press(self) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
@@ -71,6 +119,55 @@ class KobraXActionButton(KobraXEntity, ButtonEntity):
await api.async_connect()
elif self.entity_description.action == "disconnect":
await api.async_disconnect()
elif self.entity_description.action == "restart":
await api.async_restart_bridge()
elif self.entity_description.action == "skip_query":
await api.async_skip_query()
elif self.entity_description.action == "apply_update":
await self.coordinator.async_apply_update()
await self.coordinator.async_request_refresh()
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
except Exception as err:
raise ServiceValidationError(str(err)) from err
class KobraXAceDryButton(KobraXEntity, ButtonEntity):
def __init__(
self,
coordinator,
entry,
ace_id: int,
action: str, # "dry_start" or "dry_stop"
) -> None:
if action == "dry_start":
unique_key = f"ace_{ace_id}_dry_start"
name = f"ACE {ace_id + 1} Dryer Start"
icon = "mdi:tumble-dryer"
else: # dry_stop
unique_key = f"ace_{ace_id}_dry_stop"
name = f"ACE {ace_id + 1} Dryer Stop"
icon = "mdi:tumble-dryer-off"
super().__init__(coordinator, entry, unique_key, name)
self._ace_id = ace_id
self._action = action
self._attr_icon = icon
async def async_press(self) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
if self._action == "dry_start":
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
ace_cfg = cfg.get(self._ace_id) or {}
await api.async_set_ace_dry(
"start",
target_temp=int(ace_cfg.get("target_temp", 45)),
duration=int(ace_cfg.get("duration", 240)),
ace_id=self._ace_id,
)
elif self._action == "dry_stop":
await api.async_set_ace_dry("stop", ace_id=self._ace_id)
await self.coordinator.async_request_refresh()
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
@@ -78,9 +175,15 @@ class KobraXActionButton(KobraXEntity, ButtonEntity):
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
async_add_entities(
[
KobraXActionButton(coordinator, entry, description)
for description in BUTTONS
]
)
entities = [
KobraXActionButton(coordinator, entry, description)
for description in BUTTONS
]
# Pre-create all 8 ACE dryer buttons (start + stop for each of 4 ACE units)
for ace_id in range(4):
entities.append(KobraXAceDryButton(coordinator, entry, ace_id, "dry_start"))
entities.append(KobraXAceDryButton(coordinator, entry, ace_id, "dry_stop"))
async_add_entities(entities)

View File

@@ -11,6 +11,7 @@ DEFAULT_HOST = "localhost:7125"
DEFAULT_PRINTER_NAME = "Anycubic Kobra X"
UPDATE_INTERVAL = timedelta(seconds=5)
UPDATE_CHECK_INTERVAL = timedelta(hours=1)
PLATFORMS = [
"sensor",
@@ -18,6 +19,8 @@ PLATFORMS = [
"light",
"select",
"button",
"switch",
"number",
"camera",
"image",
]

View File

@@ -1,12 +1,13 @@
from __future__ import annotations
import time
from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .api import KobraXApiClient, KobraXApiError
from .const import DOMAIN, UPDATE_INTERVAL
from .const import DOMAIN, UPDATE_CHECK_INTERVAL, UPDATE_INTERVAL
class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
@@ -18,9 +19,73 @@ class KobraXCoordinator(DataUpdateCoordinator[dict[str, Any]]):
update_interval=UPDATE_INTERVAL,
)
self.api = api
self._update_info: dict[str, Any] = {}
self._restart_supported: bool | None = None
self._next_update_check_monotonic = 0.0
async def _async_update_data(self) -> dict[str, Any]:
try:
return await self.api.async_get_state()
# Probe restart endpoint once if not checked
if self._restart_supported is None:
try:
await self.api.async_restart_bridge()
self._restart_supported = True
except Exception as err:
# Only disable if 404/501, otherwise treat as available
msg = str(err)
if "404" in msg or "501" in msg:
self._restart_supported = False
else:
self._restart_supported = True
state = await self.api.async_get_state()
now = time.monotonic()
if now >= self._next_update_check_monotonic:
try:
self._update_info = await self.api.async_check_updates()
except KobraXApiError:
# Keep integration polling resilient if update service is temporarily unavailable.
pass
self._next_update_check_monotonic = now + UPDATE_CHECK_INTERVAL.total_seconds()
if self._update_info:
state["update_info"] = self._update_info
try:
skip_state = await self.api.async_get_skip_state()
state["skip_state"] = skip_state
except KobraXApiError:
# Skip endpoints are only available on newer bridge versions.
pass
return state
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err
async def async_check_updates(self) -> dict[str, Any]:
try:
update_info = await self.api.async_check_updates()
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err
self._update_info = update_info
merged = dict(self.data or {})
merged["update_info"] = update_info
self.async_set_updated_data(merged)
return update_info
async def async_apply_update(self) -> dict[str, Any]:
update_info = self._update_info or await self.async_check_updates()
tag = str(update_info.get("tag") or "").strip()
download_url = str(update_info.get("download_url") or "").strip()
if not tag or not download_url:
raise UpdateFailed("Missing tag or download URL from update check")
try:
result = await self.api.async_apply_update(tag=tag, download_url=download_url)
except KobraXApiError as err:
raise UpdateFailed(str(err)) from err
self._update_info = {**update_info, "last_apply_result": result}
merged = dict(self.data or {})
merged["update_info"] = self._update_info
self.async_set_updated_data(merged)
return result

View File

@@ -5,8 +5,9 @@
"@Gangoke"
],
"config_flow": true,
"homeassistant": "2026.3.0",
"documentation": "https://github.com/gangoke/kobrax-lan-hass-component",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/gangoke/kobrax-lan-hass-component/issues",
"version": "0.1.0"
"version": "0.2.0"
}

View File

@@ -0,0 +1,93 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.number import NumberEntity
from .const import DOMAIN
from .entity import KobraXEntity
def _minutes_to_hhmmss(minutes: int | float) -> str:
"""Convert minutes to HH:mm:ss format."""
total_seconds = int(minutes * 60)
hours = total_seconds // 3600
remaining_seconds = total_seconds % 3600
mins = remaining_seconds // 60
secs = remaining_seconds % 60
return f"{hours:02d}:{mins:02d}:{secs:02d}"
class KobraXAceDryConfigNumber(KobraXEntity, NumberEntity):
def __init__(
self,
coordinator,
entry,
ace_id: int,
config_type: str, # "target_temp" or "duration"
) -> None:
if config_type == "target_temp":
unique_key = f"ace_{ace_id}_dry_target_temp"
name = f"ACE {ace_id + 1} Dryer Target Temperature"
min_val, max_val, step_val = 30, 80, 1
unit = "°C"
icon = "mdi:thermometer"
else: # duration
unique_key = f"ace_{ace_id}_dry_duration"
name = f"ACE {ace_id + 1} Dryer Duration"
min_val, max_val, step_val = 10, 1440, 1
unit = "min"
icon = "mdi:timer-cog-outline"
super().__init__(coordinator, entry, unique_key, name)
self._ace_id = ace_id
self._config_type = config_type
self._attr_native_min_value = min_val
self._attr_native_max_value = max_val
self._attr_native_step = step_val
self._attr_native_unit_of_measurement = unit
self._attr_mode = "box"
self._attr_icon = icon
@property
def native_value(self) -> float:
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
ace_cfg = cfg.get(self._ace_id) or {}
key = "target_temp" if self._config_type == "target_temp" else "duration"
if self._config_type == "target_temp":
return float(ace_cfg.get(key, 45))
else:
return float(ace_cfg.get(key, 240))
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
if self._config_type != "duration":
return None
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
ace_cfg = cfg.get(self._ace_id) or {}
duration_minutes = ace_cfg.get("duration", 240)
return {"formatted_duration": _minutes_to_hhmmss(duration_minutes)}
async def async_set_native_value(self, value: float) -> None:
cfg = self.hass.data[DOMAIN][self._entry.entry_id]["ace_dry_config"]
if self._ace_id not in cfg:
cfg[self._ace_id] = {}
key = "target_temp" if self._config_type == "target_temp" else "duration"
cfg[self._ace_id][key] = int(round(value))
self.async_write_ha_state()
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
# Initialize ace_dry_config structure if not present
if "ace_dry_config" not in hass.data[DOMAIN][entry.entry_id]:
hass.data[DOMAIN][entry.entry_id]["ace_dry_config"] = {}
# Pre-create all 8 numbers (target_temp + duration for each of 4 ACE units)
entities = []
for ace_id in range(4):
entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "target_temp"))
entities.append(KobraXAceDryConfigNumber(coordinator, entry, ace_id, "duration"))
async_add_entities(entities)

View File

@@ -2,14 +2,45 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from urllib.parse import quote
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity, SensorEntityDescription
from homeassistant.const import PERCENTAGE, UnitOfTemperature
from homeassistant.core import callback
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_registry import RegistryEntryDisabler
from .const import DOMAIN
from .entity import KobraXEntity
MAX_FILAMENT_SLOTS = 19
TOOLHEAD_SLOT_LIMIT = 4
ACE_DIRECT_SLOT_LIMIT = 4
MAX_ACE_UNITS = 4
def _detected_slot_limit(state_data: dict[str, Any]) -> int:
mode = str(state_data.get("filament_mode") or "toolhead").lower()
if mode == "toolhead":
return TOOLHEAD_SLOT_LIMIT
if mode == "ace_direct":
return ACE_DIRECT_SLOT_LIMIT
slots = state_data.get("ams_slots") or []
if isinstance(slots, list) and slots:
return min(len(slots), MAX_FILAMENT_SLOTS)
return MAX_FILAMENT_SLOTS
def _detected_ace_unit_count(state_data: dict[str, Any]) -> int:
units = state_data.get("ace_units") or []
if isinstance(units, list):
return min(len(units), MAX_ACE_UNITS)
return 0
@dataclass(frozen=True, kw_only=True)
class KobraXSensorDescription(SensorEntityDescription):
value_key: str
@@ -97,6 +128,42 @@ SENSORS: tuple[KobraXSensorDescription, ...] = (
value_key="print_duration",
icon="mdi:timer-outline",
),
KobraXSensorDescription(
key="skip_object_count",
name="Skip Object Count",
value_key="skip_object_count",
icon="mdi:vector-polygon",
),
KobraXSensorDescription(
key="skipped_object_count",
name="Skipped Object Count",
value_key="skipped_object_count",
icon="mdi:content-cut",
),
KobraXSensorDescription(
key="filament_mode",
name="Filament Mode",
value_key="filament_mode",
icon="mdi:shape-outline",
),
KobraXSensorDescription(
key="ace_unit_count",
name="ACE Unit Count",
value_key="ace_unit_count",
icon="mdi:package-variant",
),
KobraXSensorDescription(
key="bridge_version",
name="Bridge Version",
value_key="version",
icon="mdi:source-branch",
),
KobraXSensorDescription(
key="latest_available_version",
name="Latest Available Version",
value_key="latest_available_version",
icon="mdi:cloud-download-outline",
),
)
@@ -106,31 +173,183 @@ class KobraXSensor(KobraXEntity, SensorEntity):
def __init__(self, coordinator, entry, description: KobraXSensorDescription) -> None:
super().__init__(coordinator, entry, description.key, description.name)
self.entity_description = description
if description.value_key in ("version", "latest_available_version"):
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@staticmethod
def _seconds_to_hhmmss(seconds: int) -> str:
hours = seconds // 3600
remaining_seconds = seconds % 3600
minutes = remaining_seconds // 60
secs = remaining_seconds % 60
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
@property
def native_value(self) -> Any:
if self.entity_description.value_key == "filament_mode":
return self.state_data.get("filament_mode")
if self.entity_description.value_key == "ace_unit_count":
units = self.state_data.get("ace_units")
return len(units) if isinstance(units, list) else 0
if self.entity_description.value_key == "latest_available_version":
update_info = self.state_data.get("update_info") or {}
if isinstance(update_info, dict):
return update_info.get("latest")
return None
if self.entity_description.value_key == "skip_object_count":
skip_state = self.state_data.get("skip_state") or {}
objects = skip_state.get("objects") if isinstance(skip_state, dict) else []
return len(objects) if isinstance(objects, list) else 0
if self.entity_description.value_key == "skipped_object_count":
skip_state = self.state_data.get("skip_state") or {}
skipped = skip_state.get("skipped") if isinstance(skip_state, dict) else []
return len(skipped) if isinstance(skipped, list) else 0
value = self.state_data.get(self.entity_description.value_key)
if self.entity_description.value_key == "progress" and value is not None:
return round(float(value) * 100, 1)
if self.entity_description.value_key in ("remain_time", "print_duration") and value is not None:
try:
return self._seconds_to_hhmmss(int(float(value)))
except (TypeError, ValueError):
return None
return value
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
if self.entity_description.value_key == "latest_available_version":
update_info = self.state_data.get("update_info")
if isinstance(update_info, dict):
return {
"current": update_info.get("current"),
"tag": update_info.get("tag"),
"update_available": update_info.get("update_available"),
"download_url": update_info.get("download_url"),
}
return None
if self.entity_description.value_key not in ("skip_object_count", "skipped_object_count"):
return None
skip_state = self.state_data.get("skip_state")
if not isinstance(skip_state, dict):
return None
objects = skip_state.get("objects")
skipped = skip_state.get("skipped")
return {
"objects": objects if isinstance(objects, list) else [],
"skipped": skipped if isinstance(skipped, list) else [],
"filename": skip_state.get("filename"),
"ts": skip_state.get("ts"),
}
class KobraXAceDryerSensor(KobraXEntity, SensorEntity):
"""Per-ACE-unit dryer sensor."""
def __init__(
self,
coordinator,
entry,
ace_id: int,
sensor_type: str, # "status", "humidity", "current_temp", "target_temp", "remaining_time"
) -> None:
if sensor_type == "status":
unique_key = f"ace_{ace_id}_dryer_status"
name = f"ACE {ace_id + 1} Dryer Status"
icon = "mdi:tumble-dryer"
elif sensor_type == "humidity":
unique_key = f"ace_{ace_id}_dryer_humidity"
name = f"ACE {ace_id + 1} Dryer Humidity"
icon = "mdi:water-percent"
elif sensor_type == "current_temp":
unique_key = f"ace_{ace_id}_dryer_current_temp"
name = f"ACE {ace_id + 1} Dryer Current Temperature"
icon = None
elif sensor_type == "target_temp":
unique_key = f"ace_{ace_id}_dryer_target_temp"
name = f"ACE {ace_id + 1} Dryer Target Temperature"
icon = None
else: # remaining_time
unique_key = f"ace_{ace_id}_dryer_remaining_time"
name = f"ACE {ace_id + 1} Dryer Remaining Time"
icon = "mdi:timer-sand"
super().__init__(coordinator, entry, unique_key, name)
self._ace_id = ace_id
self._sensor_type = sensor_type
if icon:
self._attr_icon = icon
if sensor_type == "humidity":
self._attr_native_unit_of_measurement = PERCENTAGE
elif sensor_type in ("current_temp", "target_temp"):
self._attr_device_class = SensorDeviceClass.TEMPERATURE
self._attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
self._attr_suggested_unit_of_measurement = UnitOfTemperature.CELSIUS
elif sensor_type == "remaining_time":
pass # HH:mm:ss format, no unit needed
@staticmethod
def _minutes_to_hhmmss(minutes: int) -> str:
"""Convert minutes to HH:mm:ss format."""
total_seconds = minutes * 60
hours = total_seconds // 3600
remaining_seconds = total_seconds % 3600
mins = remaining_seconds // 60
secs = remaining_seconds % 60
return f"{hours:02d}:{mins:02d}:{secs:02d}"
@property
def native_value(self) -> Any:
drying = self.state_data.get("ace_drying") or {}
# Handle per-unit structure: ace_drying[ace_id] or global structure
if isinstance(drying, dict):
# Try per-unit key first
unit_data = drying.get(self._ace_id)
if isinstance(unit_data, dict):
drying = unit_data
elif self._ace_id == 0 and not unit_data:
# Fall back to global structure for unit 0
pass
else:
# No data for this unit
return None
else:
return None
if self._sensor_type == "status":
status = int(drying.get("status", 0)) if drying.get("status") is not None else 0
return "running" if status else "idle"
elif self._sensor_type == "humidity":
humidity = drying.get("humidity")
return round(float(humidity), 1) if humidity is not None else None
elif self._sensor_type == "current_temp":
current_temp = drying.get("current_temp")
return round(float(current_temp), 1) if current_temp is not None else None
elif self._sensor_type == "target_temp":
target_temp = drying.get("target_temp")
return float(target_temp) if target_temp is not None else None
elif self._sensor_type == "remaining_time":
remain = drying.get("remain_time")
if remain is not None:
try:
minutes = int(remain)
return self._minutes_to_hhmmss(minutes)
except (TypeError, ValueError):
return None
return None
return None
class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
def __init__(self, coordinator, entry, slot_index: int, field: str) -> None:
name_suffix = {
"color": f"Filament Slot {slot_index + 1} Color",
"type": f"Filament Slot {slot_index + 1} Type",
}[field]
super().__init__(coordinator, entry, f"filament_slot_{slot_index + 1}_{field}", name_suffix)
def __init__(self, coordinator, entry, slot_index: int) -> None:
super().__init__(coordinator, entry, f"slot_{slot_index + 1}", f"Slot {slot_index + 1}")
self._slot_index = slot_index
self._field = field
if field == "color":
self._attr_icon = "mdi:palette"
elif field == "type":
self._attr_icon = "mdi:label"
else:
self._attr_icon = "mdi:numeric"
self._attr_icon = "mdi:circle"
def _slot(self) -> dict[str, Any]:
slots = self.state_data.get("ams_slots") or []
@@ -139,6 +358,9 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
slot = slots[self._slot_index]
return slot if isinstance(slot, dict) else {}
def _slot_limit_for_mode(self) -> int:
return _detected_slot_limit(self.state_data)
@staticmethod
def _to_color_hex(color: Any) -> str | None:
if isinstance(color, list) and len(color) >= 3:
@@ -154,30 +376,132 @@ class KobraXFilamentSlotSensor(KobraXEntity, SensorEntity):
@property
def available(self) -> bool:
return bool(self._slot()) and super().available
return self._slot_index < self._slot_limit_for_mode() and bool(self._slot()) and super().available
@property
def native_value(self) -> Any:
slot = self._slot()
if not slot:
return "EMPTY"
status = slot.get("status")
if status is not None:
try:
if int(status) != 5:
return "EMPTY"
except (TypeError, ValueError):
return "EMPTY"
material = slot.get("type")
material_str = str(material).upper() if material else "EMPTY"
color_hex = self._to_color_hex(slot.get("color"))
if color_hex and material_str != "EMPTY":
return f"{material_str} ({color_hex})"
return material_str
@property
def icon_color(self) -> str | None:
slot = self._slot()
if not slot:
return None
if self._field == "color":
return self._to_color_hex(slot.get("color"))
if self._field == "type":
material = slot.get("type")
return str(material).upper() if material else None
return None
return self._to_color_hex(slot.get("color"))
@property
def entity_picture(self) -> str | None:
"""Return a colored circle picture as a frontend fallback when icon tinting is ignored."""
slot = self._slot()
empty_svg = (
"<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 64 64'>"
"<circle cx='32' cy='32' r='28' fill='none' stroke='#666' stroke-width='4'/></svg>"
)
if not slot:
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
status = slot.get("status")
if status is not None:
try:
if int(status) != 5:
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
except (TypeError, ValueError):
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
color_hex = self._to_color_hex(slot.get("color"))
if not color_hex:
return f"data:image/svg+xml;utf8,{quote(empty_svg)}"
svg = (
"<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 64 64'>"
"<circle cx='32' cy='32' r='28' fill='"
f"{color_hex}"
"' stroke='#222' stroke-width='4'/></svg>"
)
return f"data:image/svg+xml;utf8,{quote(svg)}"
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
slot = self._slot()
if not slot:
return None
return {
"color_hex": self._to_color_hex(slot.get("color")),
"status": slot.get("status"),
"box_id": slot.get("box_id"),
"global_index": slot.get("global_index"),
"activity": slot.get("activity"),
}
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
slots = coordinator.data.get("ams_slots") if coordinator.data else []
slot_count = len(slots) if isinstance(slots, list) and len(slots) > 0 else 4
filament_entities: list[SensorEntity] = []
for slot_index in range(slot_count):
for field in ("color", "type"):
filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index, field))
for slot_index in range(MAX_FILAMENT_SLOTS):
filament_entities.append(KobraXFilamentSlotSensor(coordinator, entry, slot_index))
ace_dryer_entities: list[SensorEntity] = []
for ace_id in range(MAX_ACE_UNITS):
for sensor_type in ("status", "humidity", "current_temp", "target_temp", "remaining_time"):
ace_dryer_entities.append(KobraXAceDryerSensor(coordinator, entry, ace_id, sensor_type))
entity_registry = er.async_get(hass)
# Remove legacy single-unit ACE dryer sensors that were replaced by per-unit entities.
legacy_ace_sensor_keys = (
"ace_dryer_status",
"ace_dryer_humidity",
"ace_dryer_current_temp",
"ace_dryer_target_temp",
"ace_dryer_remaining_time",
)
for key in legacy_ace_sensor_keys:
legacy_unique_id = f"{entry.entry_id}_{key}"
legacy_entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, legacy_unique_id)
if legacy_entity_id:
entity_registry.async_remove(legacy_entity_id)
@callback
def _sync_slot_registry_state() -> None:
state_data = coordinator.data or {}
enabled_slots = _detected_slot_limit(state_data)
for slot_index in range(MAX_FILAMENT_SLOTS):
unique_id = f"{entry.entry_id}_slot_{slot_index + 1}"
entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, unique_id)
if not entity_id:
continue
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
should_enable = slot_index < enabled_slots
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
async_add_entities(
[
@@ -185,4 +509,67 @@ async def async_setup_entry(hass, entry, async_add_entities):
for description in SENSORS
]
+ filament_entities
+ ace_dryer_entities
)
@callback
def _sync_ace_registry_state() -> None:
state_data = coordinator.data or {}
enabled_ace_units = _detected_ace_unit_count(state_data)
# Define all ACE entity patterns: (platform, unique_id_pattern_parts)
ace_entities: list[tuple[str, str]] = []
for ace_index in range(MAX_ACE_UNITS):
# Switch entities
ace_entities.append(("switch", f"{entry.entry_id}_ace_{ace_index}_auto_feed"))
# Number entities (temp + duration)
ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_target_temp"))
ace_entities.append(("number", f"{entry.entry_id}_ace_{ace_index}_dry_duration"))
# Button entities (start + stop)
ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_start"))
ace_entities.append(("button", f"{entry.entry_id}_ace_{ace_index}_dry_stop"))
# Sensor entities (status, humidity, current_temp, target_temp, remaining_time)
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_status"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_humidity"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_current_temp"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_target_temp"))
ace_entities.append(("sensor", f"{entry.entry_id}_ace_{ace_index}_dryer_remaining_time"))
for platform, unique_id in ace_entities:
entity_id = entity_registry.async_get_entity_id(platform, DOMAIN, unique_id)
if not entity_id:
continue
reg_entry = entity_registry.async_get(entity_id)
if reg_entry is None:
continue
# Extract ace_index from unique_id
parts = unique_id.split("_")
if len(parts) >= 3:
try:
ace_index = int(parts[2])
except (ValueError, IndexError):
continue
else:
continue
should_enable = ace_index < enabled_ace_units
if should_enable:
if reg_entry.disabled_by == RegistryEntryDisabler.INTEGRATION:
entity_registry.async_update_entity(entity_id, disabled_by=None)
else:
if reg_entry.disabled_by is None:
entity_registry.async_update_entity(
entity_id,
disabled_by=RegistryEntryDisabler.INTEGRATION,
)
entry.async_on_unload(coordinator.async_add_listener(_sync_slot_registry_state))
entry.async_on_unload(coordinator.async_add_listener(_sync_ace_registry_state))
_sync_slot_registry_state()
_sync_ace_registry_state()

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
from homeassistant.components.switch import SwitchEntity
from homeassistant.exceptions import ServiceValidationError
from .api import KobraXApiError
from .const import DOMAIN
from .entity import KobraXEntity
class KobraXAceAutoFeedSwitch(KobraXEntity, SwitchEntity):
def __init__(self, coordinator, entry, ace_id: int) -> None:
super().__init__(coordinator, entry, f"ace_{ace_id}_auto_feed", f"ACE {ace_id + 1} Auto Fill")
self._ace_id = ace_id
self._attr_icon = "mdi:autorenew"
@property
def is_on(self) -> bool:
auto_feed = self.state_data.get("ace_auto_feed") or {}
if not isinstance(auto_feed, dict):
return False
value = auto_feed.get(self._ace_id)
if value is None:
value = auto_feed.get(str(self._ace_id))
return bool(value)
async def async_turn_on(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, True)
await self.coordinator.async_request_refresh()
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_turn_off(self, **kwargs) -> None:
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
try:
await api.async_set_ace_auto_feed(self._ace_id, False)
await self.coordinator.async_request_refresh()
except KobraXApiError as err:
raise ServiceValidationError(str(err)) from err
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
# Pre-create switches for all 4 possible ACE units
async_add_entities(
[KobraXAceAutoFeedSwitch(coordinator, entry, ace_id) for ace_id in range(4)]
)

View File

@@ -4,7 +4,7 @@
"domains": [
"kobrax_lan"
],
"homeassistant": "2024.6.0",
"homeassistant": "2026.3.0",
"iot_class": "local_polling",
"render_readme": true
}