mirror of
https://github.com/gangoke/kobrax-lan-hass-component.git
synced 2026-06-10 05:02:12 +02:00
Compare commits
1 Commits
filament-a
...
camera-rew
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45591f41a4 |
@@ -13,7 +13,8 @@ Architecture:
|
||||
- Light control
|
||||
- Print speed mode selection
|
||||
- Printer action buttons (pause, resume, cancel)
|
||||
- Camera snapshot entity using `/api/camera/snapshot`
|
||||
- Camera stream entity using the printer RTSP URL from KX-Bridge, with bridge MJPEG proxy fallback
|
||||
- Camera snapshot fallback using `/api/camera/snapshot`
|
||||
|
||||
## Prerequisites
|
||||
|
||||
@@ -38,3 +39,4 @@ The config flow asks for:
|
||||
|
||||
- This integration talks to KX-Bridge HTTP endpoints and does not connect directly to the printer.
|
||||
- Keep KX-Bridge and Home Assistant on the same trusted network.
|
||||
- Native WebRTC is not implemented in this integration. If you want WebRTC in Home Assistant, point `go2rtc` or a WebRTC-capable HA add-on at the camera entity's RTSP source.
|
||||
|
||||
@@ -17,6 +17,9 @@ class KobraXApiClient:
|
||||
def _url(self, path: str) -> str:
|
||||
return f"{self._base_url}{path}"
|
||||
|
||||
def camera_stream_proxy_url(self) -> str:
|
||||
return self._url("/api/camera/stream")
|
||||
|
||||
async def _get_json(self, path: str) -> dict[str, Any]:
|
||||
try:
|
||||
async with self._session.get(self._url(path)) as response:
|
||||
@@ -76,6 +79,16 @@ class KobraXApiClient:
|
||||
async def async_disconnect(self) -> None:
|
||||
await self._post_json("/api/disconnect", {})
|
||||
|
||||
async def async_start_camera(self) -> None:
|
||||
await self._post_json("/api/camera/start", {})
|
||||
|
||||
async def async_get_camera_url(self) -> str | None:
|
||||
data = await self._get_json("/api/camera")
|
||||
url = data.get("url")
|
||||
if url is None or isinstance(url, str):
|
||||
return url
|
||||
raise KobraXApiError("Unexpected response for /api/camera")
|
||||
|
||||
async def async_get_camera_snapshot(self) -> bytes:
|
||||
try:
|
||||
async with self._session.get(self._url("/api/camera/snapshot")) as response:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from homeassistant.components.camera import Camera
|
||||
from homeassistant.components.camera import Camera, CameraEntityFeature
|
||||
|
||||
from .api import KobraXApiError
|
||||
from .const import DOMAIN
|
||||
@@ -8,6 +8,9 @@ from .entity import KobraXEntity
|
||||
|
||||
|
||||
class KobraXCamera(KobraXEntity, Camera):
|
||||
_attr_supported_features = CameraEntityFeature.STREAM
|
||||
_attr_use_stream_for_stills = True
|
||||
|
||||
def __init__(self, coordinator, entry, key: str, name: str) -> None:
|
||||
KobraXEntity.__init__(self, coordinator, entry, key, name)
|
||||
Camera.__init__(self)
|
||||
@@ -16,6 +19,34 @@ class KobraXCamera(KobraXEntity, Camera):
|
||||
def is_streaming(self) -> bool:
|
||||
return bool(self.state_data.get("camera_url"))
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self) -> dict[str, str]:
|
||||
camera_url = self.state_data.get("camera_url")
|
||||
attrs = {
|
||||
"camera_mjpeg_proxy_url": self.hass.data[DOMAIN][self._entry.entry_id]["api"].camera_stream_proxy_url(),
|
||||
}
|
||||
if isinstance(camera_url, str) and camera_url:
|
||||
attrs["camera_rtsp_url"] = camera_url
|
||||
return attrs
|
||||
|
||||
async def stream_source(self) -> str | None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
await api.async_start_camera()
|
||||
except KobraXApiError:
|
||||
pass
|
||||
|
||||
camera_url = self.state_data.get("camera_url")
|
||||
if isinstance(camera_url, str) and camera_url:
|
||||
return camera_url
|
||||
|
||||
try:
|
||||
camera_url = await api.async_get_camera_url()
|
||||
except KobraXApiError:
|
||||
return api.camera_stream_proxy_url()
|
||||
|
||||
return camera_url or api.camera_stream_proxy_url()
|
||||
|
||||
async def async_camera_image(self, width: int | None = None, height: int | None = None) -> bytes | None:
|
||||
api = self.hass.data[DOMAIN][self._entry.entry_id]["api"]
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user