from __future__ import annotations import asyncio import logging from typing import TYPE_CHECKING, Any, Callable, Awaitable from telegram import Update from telegram.ext import Application, ApplicationBuilder, MessageHandler, filters from channels.base import BaseChannel from config import settings if TYPE_CHECKING: pass logger = logging.getLogger(__name__) class TelegramChannel(BaseChannel): """Telegram-Kanal via python-telegram-bot (Long-Polling).""" def __init__(self) -> None: self._app: Application | None = None self._inbound_callback: Callable[[dict[str, Any]], Awaitable[None]] | None = None def set_inbound_callback(self, cb: Callable[[dict[str, Any]], Awaitable[None]]) -> None: """Callback, der bei eingehenden Nachrichten aufgerufen wird.""" self._inbound_callback = cb # ── BaseChannel interface ────────────────────────────────────────────────── async def send_message( self, recipient: str, text: str, reply_to_id: str | None = None, ) -> dict[str, Any]: if not self._app: return {"success": False, "channel_message_id": None, "error": "Telegram not initialized"} try: kwargs: dict[str, Any] = {"chat_id": recipient, "text": text} if reply_to_id: kwargs["reply_to_message_id"] = int(reply_to_id) msg = await self._app.bot.send_message(**kwargs) return {"success": True, "channel_message_id": str(msg.message_id), "error": None} except Exception as exc: logger.error("Telegram send error: %s", exc) return {"success": False, "channel_message_id": None, "error": str(exc)} async def check_connection(self) -> tuple[bool, str]: if not self._app: return False, "Not initialized" try: me = await self._app.bot.get_me() return True, f"@{me.username}" except Exception as exc: return False, str(exc) async def start(self) -> None: if not settings.telegram_enabled: logger.info("Telegram disabled (no token configured)") return self._app = ApplicationBuilder().token(settings.telegram_token).build() self._app.add_handler(MessageHandler(~filters.COMMAND, self._handle_message)) await self._app.initialize() await self._app.start() # Long-Polling als Background-Task asyncio.create_task(self._polling_loop(), name="telegram-polling") logger.info("Telegram channel started (long-polling)") async def stop(self) -> None: if self._app: # Korrekte PTB v20+ Shutdown-Reihenfolge: # Erst Updater stoppen, dann Application stoppen/shutdown try: if self._app.updater.running: await self._app.updater.stop() except Exception: pass await self._app.stop() await self._app.shutdown() logger.info("Telegram channel stopped") # ── Interner Handler ─────────────────────────────────────────────────────── async def _polling_loop(self) -> None: """Endlos-Polling im Hintergrund.""" try: await self._app.updater.start_polling(allowed_updates=["message"]) # In PTB v20+ läuft der Updater als eigener asyncio-Task weiter – # wir warten hier einfach, bis der Task abgebrochen wird. await asyncio.Event().wait() except asyncio.CancelledError: pass except Exception as exc: logger.error("Telegram polling error: %s", exc) async def _handle_message(self, update: Update, context: Any) -> None: if not update.message or not self._inbound_callback: return msg = update.message payload: dict[str, Any] = { "channel": "telegram", "channel_message_id": str(msg.message_id), "sender_telegram_id": str(msg.from_user.id) if msg.from_user else None, "sender_name": ( (msg.from_user.full_name or msg.from_user.username) if msg.from_user else "Unknown" ), "chat_id": str(msg.chat.id), "text": msg.text or msg.caption or "", "reply_to_id": ( str(msg.reply_to_message.message_id) if msg.reply_to_message else None ), } await self._inbound_callback(payload)