fix: WhatsApp-Polling als eigene Schleife statt APScheduler (kein Overlap mehr)
This commit is contained in:
@@ -21,6 +21,7 @@ class WhatsAppChannel(BaseChannel):
|
|||||||
)
|
)
|
||||||
self._token = settings.whatsapp_api_token
|
self._token = settings.whatsapp_api_token
|
||||||
self._session: aiohttp.ClientSession | None = None
|
self._session: aiohttp.ClientSession | None = None
|
||||||
|
self._polling_task: asyncio.Task | None = None
|
||||||
self._inbound_callback: Callable[[dict[str, Any]], Awaitable[None]] | None = None
|
self._inbound_callback: Callable[[dict[str, Any]], Awaitable[None]] | None = None
|
||||||
|
|
||||||
def set_inbound_callback(self, cb: Callable[[dict[str, Any]], Awaitable[None]]) -> None:
|
def set_inbound_callback(self, cb: Callable[[dict[str, Any]], Awaitable[None]]) -> None:
|
||||||
@@ -72,37 +73,45 @@ class WhatsAppChannel(BaseChannel):
|
|||||||
return
|
return
|
||||||
self._session = aiohttp.ClientSession()
|
self._session = aiohttp.ClientSession()
|
||||||
await self._ensure_incoming_enabled()
|
await self._ensure_incoming_enabled()
|
||||||
|
self._polling_task = asyncio.create_task(self._polling_loop(), name="whatsapp-polling")
|
||||||
logger.info("WhatsApp channel started")
|
logger.info("WhatsApp channel started")
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
|
if self._polling_task and not self._polling_task.done():
|
||||||
|
self._polling_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._polling_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
self._polling_task = None
|
||||||
if self._session and not self._session.closed:
|
if self._session and not self._session.closed:
|
||||||
await self._session.close()
|
await self._session.close()
|
||||||
logger.info("WhatsApp channel stopped")
|
logger.info("WhatsApp channel stopped")
|
||||||
|
|
||||||
# ── Polling (wird vom Scheduler aufgerufen) ────────────────────────────────
|
# ── Polling-Schleife ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
async def poll_incoming(self) -> None:
|
async def _polling_loop(self) -> None:
|
||||||
"""Eingehende Nachrichten per Polling abrufen (Green API Notification Queue)."""
|
"""Endlos-Polling: direkt nach Antwort wieder starten, kein Overlap möglich."""
|
||||||
if not settings.whatsapp_enabled:
|
|
||||||
return
|
|
||||||
url = f"{self._base_url}/receiveNotification/{self._token}"
|
url = f"{self._base_url}/receiveNotification/{self._token}"
|
||||||
try:
|
timeout = aiohttp.ClientTimeout(total=15)
|
||||||
timeout = aiohttp.ClientTimeout(total=8)
|
while True:
|
||||||
async with self._get_session().get(url, params={"receiveTimeout": 3}, timeout=timeout) as resp:
|
try:
|
||||||
if resp.status != 200:
|
async with self._get_session().get(
|
||||||
return
|
url, params={"receiveTimeout": 5}, timeout=timeout
|
||||||
data = await resp.json()
|
) as resp:
|
||||||
if not data:
|
if resp.status == 200:
|
||||||
return
|
data = await resp.json()
|
||||||
receipt_id = data.get("receiptId")
|
if data:
|
||||||
body = data.get("body", {})
|
receipt_id = data.get("receiptId")
|
||||||
await self._process_notification(body)
|
body = data.get("body", {})
|
||||||
if receipt_id:
|
await self._process_notification(body)
|
||||||
await self._delete_notification(receipt_id)
|
if receipt_id:
|
||||||
except asyncio.CancelledError:
|
await self._delete_notification(receipt_id)
|
||||||
raise
|
except asyncio.CancelledError:
|
||||||
except Exception as exc:
|
break
|
||||||
logger.error("WhatsApp poll error: %s", exc)
|
except Exception as exc:
|
||||||
|
logger.error("WhatsApp poll error: %s", exc)
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
async def _delete_notification(self, receipt_id: int) -> None:
|
async def _delete_notification(self, receipt_id: int) -> None:
|
||||||
url = f"{self._base_url}/deleteNotification/{self._token}/{receipt_id}"
|
url = f"{self._base_url}/deleteNotification/{self._token}/{receipt_id}"
|
||||||
|
|||||||
8
main.py
8
main.py
@@ -20,7 +20,6 @@ from channels.whatsapp_channel import WhatsAppChannel
|
|||||||
from config import settings
|
from config import settings
|
||||||
from db.database import init_db
|
from db.database import init_db
|
||||||
from services import message_service
|
from services import message_service
|
||||||
from tasks.receiver import build_scheduler
|
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO,
|
level=logging.INFO,
|
||||||
@@ -67,11 +66,7 @@ async def main(with_tui: bool = True) -> None:
|
|||||||
await whatsapp.start()
|
await whatsapp.start()
|
||||||
await sms.start()
|
await sms.start()
|
||||||
|
|
||||||
# 6. Hintergrund-Tasks starten (WhatsApp-Polling etc.)
|
# 6. Uvicorn als Hintergrund-Task starten
|
||||||
scheduler = build_scheduler(whatsapp)
|
|
||||||
scheduler.start()
|
|
||||||
|
|
||||||
# 7. Uvicorn als Hintergrund-Task starten
|
|
||||||
api_task = asyncio.create_task(_run_api(), name="mcm-api")
|
api_task = asyncio.create_task(_run_api(), name="mcm-api")
|
||||||
logger.info("API running on http://%s:%d", settings.host, settings.port)
|
logger.info("API running on http://%s:%d", settings.host, settings.port)
|
||||||
|
|
||||||
@@ -91,7 +86,6 @@ async def main(with_tui: bool = True) -> None:
|
|||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
logger.info("MCM shutting down…")
|
logger.info("MCM shutting down…")
|
||||||
scheduler.shutdown(wait=False)
|
|
||||||
api_task.cancel()
|
api_task.cancel()
|
||||||
try:
|
try:
|
||||||
await api_task
|
await api_task
|
||||||
|
|||||||
Reference in New Issue
Block a user