from __future__ import annotations import asyncio import logging from typing import Any, Callable, Awaitable import aiohttp from channels.base import BaseChannel from config import settings logger = logging.getLogger(__name__) class WhatsAppChannel(BaseChannel): """WhatsApp-Kanal via Green API (https://green-api.com).""" def __init__(self) -> None: self._base_url = ( f"https://api.green-api.com/waInstance{settings.whatsapp_id_instance}" ) self._token = settings.whatsapp_api_token self._session: aiohttp.ClientSession | None = None self._polling_task: asyncio.Task | None = None self._inbound_callback: Callable[[dict[str, Any]], Awaitable[None]] | None = None def set_inbound_callback(self, cb: Callable[[dict[str, Any]], Awaitable[None]]) -> None: self._inbound_callback = cb # ── BaseChannel interface ────────────────────────────────────────────────── async def send_message( self, recipient: str, text: str, reply_to_id: str | None = None, ) -> dict[str, Any]: if not settings.whatsapp_enabled: return {"success": False, "channel_message_id": None, "error": "WhatsApp not configured"} # chatId: Telefonnummer ohne + gefolgt von @c.us, z.B. "4917612345678@c.us" chat_id = recipient.replace("@c.us", "").lstrip("+") + "@c.us" url = f"{self._base_url}/sendMessage/{self._token}" body: dict[str, Any] = {"chatId": chat_id, "message": text} if reply_to_id: body["quotedMessageId"] = reply_to_id try: async with self._get_session().post(url, json=body) as resp: data = await resp.json() if resp.status == 200 and "idMessage" in data: return {"success": True, "channel_message_id": data["idMessage"], "error": None} return {"success": False, "channel_message_id": None, "error": str(data)} except Exception as exc: logger.error("WhatsApp send error: %s", exc) return {"success": False, "channel_message_id": None, "error": str(exc)} async def check_connection(self) -> tuple[bool, str]: if not settings.whatsapp_enabled: return False, "Not configured" url = f"{self._base_url}/getStateInstance/{self._token}" try: async with self._get_session().get(url) as resp: data = await resp.json() state = data.get("stateInstance", "unknown") return state == "authorized", state except Exception as exc: return False, str(exc) async def start(self) -> None: if not settings.whatsapp_enabled: logger.info("WhatsApp disabled (no credentials configured)") return self._session = aiohttp.ClientSession() await self._ensure_incoming_enabled() self._polling_task = asyncio.create_task(self._polling_loop(), name="whatsapp-polling") logger.info("WhatsApp channel started") async def stop(self) -> None: if self._polling_task and not self._polling_task.done(): self._polling_task.cancel() try: await self._polling_task except asyncio.CancelledError: pass self._polling_task = None if self._session and not self._session.closed: await self._session.close() logger.info("WhatsApp channel stopped") # ── Polling-Schleife ─────────────────────────────────────────────────────── async def _polling_loop(self) -> None: """Endlos-Polling: direkt nach Antwort wieder starten, kein Overlap möglich.""" url = f"{self._base_url}/receiveNotification/{self._token}" timeout = aiohttp.ClientTimeout(total=15) while True: try: async with self._get_session().get( url, params={"receiveTimeout": 5}, timeout=timeout ) as resp: if resp.status == 200: data = await resp.json() if data: receipt_id = data.get("receiptId") body = data.get("body", {}) await self._process_notification(body) if receipt_id: await self._delete_notification(receipt_id) except asyncio.CancelledError: break except Exception as exc: logger.error("WhatsApp poll error: %s", exc) await asyncio.sleep(5) async def _delete_notification(self, receipt_id: int) -> None: url = f"{self._base_url}/deleteNotification/{self._token}/{receipt_id}" try: async with self._get_session().delete(url) as resp: await resp.read() except Exception as exc: logger.warning("WhatsApp delete notification error: %s", exc) async def _process_notification(self, body: dict[str, Any]) -> None: if not self._inbound_callback: return msg_type = body.get("typeWebhook") if msg_type != "incomingMessageReceived": return sender_data = body.get("senderData", {}) message_data = body.get("messageData", {}) text_data = message_data.get("textMessageData", {}) payload: dict[str, Any] = { "channel": "whatsapp", "channel_message_id": body.get("idMessage"), "sender_phone": sender_data.get("sender", "").replace("@c.us", ""), "sender_name": sender_data.get("senderName", ""), "chat_id": sender_data.get("chatId", ""), "text": text_data.get("textMessage", ""), "reply_to_id": None, } await self._inbound_callback(payload) async def _ensure_incoming_enabled(self) -> None: """Stellt sicher dass incomingWebhook in der Green-API-Instanz aktiviert ist.""" url = f"{self._base_url}/setSettings/{self._token}" try: async with self._get_session().post( url, json={"incomingWebhook": "yes", "outgoingWebhook": "yes"}, timeout=aiohttp.ClientTimeout(total=10), ) as resp: data = await resp.json() if data.get("saveSettings"): logger.info("WhatsApp: incomingWebhook aktiviert") else: logger.warning("WhatsApp: setSettings Antwort: %s", data) except Exception as exc: logger.warning("WhatsApp: incomingWebhook konnte nicht gesetzt werden: %s", exc) def _get_session(self) -> aiohttp.ClientSession: if not self._session or self._session.closed: self._session = aiohttp.ClientSession() return self._session