Files
MCM/channels/whatsapp_channel.py

166 lines
7.0 KiB
Python

from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable, Awaitable
import aiohttp
from channels.base import BaseChannel
from config import settings
logger = logging.getLogger(__name__)
class WhatsAppChannel(BaseChannel):
"""WhatsApp-Kanal via Green API (https://green-api.com)."""
def __init__(self) -> None:
self._base_url = (
f"https://api.green-api.com/waInstance{settings.whatsapp_id_instance}"
)
self._token = settings.whatsapp_api_token
self._session: aiohttp.ClientSession | None = None
self._polling_task: asyncio.Task | None = None
self._inbound_callback: Callable[[dict[str, Any]], Awaitable[None]] | None = None
def set_inbound_callback(self, cb: Callable[[dict[str, Any]], Awaitable[None]]) -> None:
self._inbound_callback = cb
# ── BaseChannel interface ──────────────────────────────────────────────────
async def send_message(
self,
recipient: str,
text: str,
reply_to_id: str | None = None,
) -> dict[str, Any]:
if not settings.whatsapp_enabled:
return {"success": False, "channel_message_id": None, "error": "WhatsApp not configured"}
# chatId: Telefonnummer ohne + gefolgt von @c.us, z.B. "4917612345678@c.us"
chat_id = recipient.replace("@c.us", "").lstrip("+") + "@c.us"
url = f"{self._base_url}/sendMessage/{self._token}"
body: dict[str, Any] = {"chatId": chat_id, "message": text}
if reply_to_id:
body["quotedMessageId"] = reply_to_id
try:
async with self._get_session().post(url, json=body) as resp:
data = await resp.json()
if resp.status == 200 and "idMessage" in data:
return {"success": True, "channel_message_id": data["idMessage"], "error": None}
return {"success": False, "channel_message_id": None, "error": str(data)}
except Exception as exc:
logger.error("WhatsApp send error: %s", exc)
return {"success": False, "channel_message_id": None, "error": str(exc)}
async def check_connection(self) -> tuple[bool, str]:
if not settings.whatsapp_enabled:
return False, "Not configured"
url = f"{self._base_url}/getStateInstance/{self._token}"
try:
async with self._get_session().get(url) as resp:
data = await resp.json()
state = data.get("stateInstance", "unknown")
return state == "authorized", state
except Exception as exc:
return False, str(exc)
async def start(self) -> None:
if not settings.whatsapp_enabled:
logger.info("WhatsApp disabled (no credentials configured)")
return
self._session = aiohttp.ClientSession()
await self._ensure_incoming_enabled()
self._polling_task = asyncio.create_task(self._polling_loop(), name="whatsapp-polling")
logger.info("WhatsApp channel started")
async def stop(self) -> None:
if self._polling_task and not self._polling_task.done():
self._polling_task.cancel()
try:
await self._polling_task
except asyncio.CancelledError:
pass
self._polling_task = None
if self._session and not self._session.closed:
await self._session.close()
logger.info("WhatsApp channel stopped")
# ── Polling-Schleife ───────────────────────────────────────────────────────
async def _polling_loop(self) -> None:
"""Endlos-Polling: direkt nach Antwort wieder starten, kein Overlap möglich."""
url = f"{self._base_url}/receiveNotification/{self._token}"
timeout = aiohttp.ClientTimeout(total=15)
while True:
try:
async with self._get_session().get(
url, params={"receiveTimeout": 5}, timeout=timeout
) as resp:
if resp.status == 200:
data = await resp.json()
if data:
receipt_id = data.get("receiptId")
body = data.get("body", {})
await self._process_notification(body)
if receipt_id:
await self._delete_notification(receipt_id)
except asyncio.CancelledError:
break
except Exception as exc:
logger.error("WhatsApp poll error: %s", exc)
await asyncio.sleep(5)
async def _delete_notification(self, receipt_id: int) -> None:
url = f"{self._base_url}/deleteNotification/{self._token}/{receipt_id}"
try:
async with self._get_session().delete(url) as resp:
await resp.read()
except Exception as exc:
logger.warning("WhatsApp delete notification error: %s", exc)
async def _process_notification(self, body: dict[str, Any]) -> None:
if not self._inbound_callback:
return
msg_type = body.get("typeWebhook")
if msg_type != "incomingMessageReceived":
return
sender_data = body.get("senderData", {})
message_data = body.get("messageData", {})
text_data = message_data.get("textMessageData", {})
payload: dict[str, Any] = {
"channel": "whatsapp",
"channel_message_id": body.get("idMessage"),
"sender_phone": sender_data.get("sender", "").replace("@c.us", ""),
"sender_name": sender_data.get("senderName", ""),
"chat_id": sender_data.get("chatId", ""),
"text": text_data.get("textMessage", ""),
"reply_to_id": None,
}
await self._inbound_callback(payload)
async def _ensure_incoming_enabled(self) -> None:
"""Stellt sicher dass incomingWebhook in der Green-API-Instanz aktiviert ist."""
url = f"{self._base_url}/setSettings/{self._token}"
try:
async with self._get_session().post(
url,
json={"incomingWebhook": "yes", "outgoingWebhook": "yes"},
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
data = await resp.json()
if data.get("saveSettings"):
logger.info("WhatsApp: incomingWebhook aktiviert")
else:
logger.warning("WhatsApp: setSettings Antwort: %s", data)
except Exception as exc:
logger.warning("WhatsApp: incomingWebhook konnte nicht gesetzt werden: %s", exc)
def _get_session(self) -> aiohttp.ClientSession:
if not self._session or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session