From 0dc718f54120458c33e33418b924f80284de8481 Mon Sep 17 00:00:00 2001 From: "itdrui.de" Date: Thu, 2 Apr 2026 13:11:52 +0200 Subject: [PATCH] fix: WhatsApp-Polling als eigene Schleife statt APScheduler (kein Overlap mehr) --- channels/whatsapp_channel.py | 53 +++++++++++++++++++++--------------- main.py | 8 +----- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/channels/whatsapp_channel.py b/channels/whatsapp_channel.py index 7a829fe..bf8f732 100644 --- a/channels/whatsapp_channel.py +++ b/channels/whatsapp_channel.py @@ -21,6 +21,7 @@ class WhatsAppChannel(BaseChannel): ) self._token = settings.whatsapp_api_token self._session: aiohttp.ClientSession | None = None + self._polling_task: asyncio.Task | None = None self._inbound_callback: Callable[[dict[str, Any]], Awaitable[None]] | None = None def set_inbound_callback(self, cb: Callable[[dict[str, Any]], Awaitable[None]]) -> None: @@ -72,37 +73,45 @@ class WhatsAppChannel(BaseChannel): return self._session = aiohttp.ClientSession() await self._ensure_incoming_enabled() + self._polling_task = asyncio.create_task(self._polling_loop(), name="whatsapp-polling") logger.info("WhatsApp channel started") async def stop(self) -> None: + if self._polling_task and not self._polling_task.done(): + self._polling_task.cancel() + try: + await self._polling_task + except asyncio.CancelledError: + pass + self._polling_task = None if self._session and not self._session.closed: await self._session.close() logger.info("WhatsApp channel stopped") - # ── Polling (wird vom Scheduler aufgerufen) ──────────────────────────────── + # ── Polling-Schleife ─────────────────────────────────────────────────────── - async def poll_incoming(self) -> None: - """Eingehende Nachrichten per Polling abrufen (Green API Notification Queue).""" - if not settings.whatsapp_enabled: - return + async def _polling_loop(self) -> None: + """Endlos-Polling: direkt nach Antwort wieder starten, kein Overlap möglich.""" url = f"{self._base_url}/receiveNotification/{self._token}" - try: - timeout = aiohttp.ClientTimeout(total=8) - async with self._get_session().get(url, params={"receiveTimeout": 3}, timeout=timeout) as resp: - if resp.status != 200: - return - data = await resp.json() - if not data: - return - receipt_id = data.get("receiptId") - body = data.get("body", {}) - await self._process_notification(body) - if receipt_id: - await self._delete_notification(receipt_id) - except asyncio.CancelledError: - raise - except Exception as exc: - logger.error("WhatsApp poll error: %s", exc) + timeout = aiohttp.ClientTimeout(total=15) + while True: + try: + async with self._get_session().get( + url, params={"receiveTimeout": 5}, timeout=timeout + ) as resp: + if resp.status == 200: + data = await resp.json() + if data: + receipt_id = data.get("receiptId") + body = data.get("body", {}) + await self._process_notification(body) + if receipt_id: + await self._delete_notification(receipt_id) + except asyncio.CancelledError: + break + except Exception as exc: + logger.error("WhatsApp poll error: %s", exc) + await asyncio.sleep(5) async def _delete_notification(self, receipt_id: int) -> None: url = f"{self._base_url}/deleteNotification/{self._token}/{receipt_id}" diff --git a/main.py b/main.py index 4a50bda..86da946 100644 --- a/main.py +++ b/main.py @@ -20,7 +20,6 @@ from channels.whatsapp_channel import WhatsAppChannel from config import settings from db.database import init_db from services import message_service -from tasks.receiver import build_scheduler logging.basicConfig( level=logging.INFO, @@ -67,11 +66,7 @@ async def main(with_tui: bool = True) -> None: await whatsapp.start() await sms.start() - # 6. Hintergrund-Tasks starten (WhatsApp-Polling etc.) - scheduler = build_scheduler(whatsapp) - scheduler.start() - - # 7. Uvicorn als Hintergrund-Task starten + # 6. Uvicorn als Hintergrund-Task starten api_task = asyncio.create_task(_run_api(), name="mcm-api") logger.info("API running on http://%s:%d", settings.host, settings.port) @@ -91,7 +86,6 @@ async def main(with_tui: bool = True) -> None: pass finally: logger.info("MCM shutting down…") - scheduler.shutdown(wait=False) api_task.cancel() try: await api_task