Files
MCM/services/message_service.py
itdrui.de 18ad0735ef feat: Telegram QR-Code Invite-Link + WhatsApp Empfang-Fix
Telegram:
- /start <contact_id> Deep-Link Handler: verknüpft Kontakt automatisch mit chat_id
- QR-Code Endpunkt GET /api/v1/contacts/{id}/telegram-qr (PNG)
- TUI: Taste T öffnet QR-Code im Browser (HTML mit eingebettetem PNG)
- config.py + .env.example: TELEGRAM_BOT_USERNAME=mcm_bot
- qrcode[pil] zu requirements.txt hinzugefügt

WhatsApp:
- receiveTimeout 5→3s, HTTP-Timeout 8s → verhindert Polling-Overlap
2026-03-13 14:45:06 +01:00

201 lines
6.9 KiB
Python

from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Any
from sqlalchemy.orm import Session
from db.database import SessionLocal
from db.models import Message
from schemas import SendMessageRequest
from services import contact_service, conversation_service
if TYPE_CHECKING:
from channels.telegram_channel import TelegramChannel
from channels.whatsapp_channel import WhatsAppChannel
from channels.sms_channel import SmsChannel
logger = logging.getLogger(__name__)
# Wird in main.py nach Initialisierung gesetzt
_telegram: "TelegramChannel | None" = None
_whatsapp: "WhatsAppChannel | None" = None
_sms: "SmsChannel | None" = None
# Callback für TUI-Benachrichtigungen bei neuen Nachrichten
_new_message_callbacks: list[Any] = []
def register_channels(telegram: Any, whatsapp: Any, sms: Any) -> None:
global _telegram, _whatsapp, _sms
_telegram = telegram
_whatsapp = whatsapp
_sms = sms
def add_new_message_callback(cb: Any) -> None:
_new_message_callbacks.append(cb)
# ── Senden ─────────────────────────────────────────────────────────────────────
async def send(db: Session, req: SendMessageRequest) -> Message:
# Empfänger & Konversation ermitteln
if req.channel == "telegram":
recipient_id = req.recipient_telegram_id
contact = contact_service.get_or_create_by_telegram(
db, recipient_id, name=recipient_id
)
channel_conv_id = recipient_id
else:
recipient_id = req.recipient_phone
contact = contact_service.get_or_create_by_phone(db, recipient_id)
channel_conv_id = recipient_id
conv = conversation_service.get_or_create(
db, req.channel, channel_conv_id, contact
)
# Nachricht in DB anlegen (pending)
msg = Message(
conversation_id=conv.id,
channel=req.channel,
direction="outbound",
text=req.text,
status="pending",
reply_to_id=req.reply_to_id,
)
db.add(msg)
db.commit()
db.refresh(msg)
# Kanal-spezifisches Senden
result = await _dispatch(req.channel, recipient_id, req.text, req.reply_to_id)
# Status aktualisieren
if result["success"]:
msg.status = "sent"
msg.sent_at = datetime.utcnow()
msg.channel_message_id = result.get("channel_message_id")
else:
msg.status = "failed"
msg.error_message = result.get("error")
conv.last_message_at = datetime.utcnow()
db.commit()
db.refresh(msg)
await _notify_callbacks(msg)
return msg
async def _dispatch(channel: str, recipient: str, text: str, reply_to: str | None) -> dict:
if channel == "telegram" and _telegram:
return await _telegram.send_message(recipient, text, reply_to)
if channel == "whatsapp" and _whatsapp:
return await _whatsapp.send_message(recipient, text, reply_to)
if channel == "sms" and _sms:
return await _sms.send_message(recipient, text, reply_to)
return {"success": False, "channel_message_id": None, "error": "Channel not available"}
# ── Empfangen (Inbound) ────────────────────────────────────────────────────────
async def handle_inbound(payload: dict[str, Any]) -> None:
"""Wird von den Kanal-Callbacks aufgerufen."""
db = SessionLocal()
try:
channel = payload["channel"]
if channel == "telegram":
link_contact_id = payload.get("link_contact_id")
if link_contact_id:
# Kontakt aus QR-Code: telegram_id verknüpfen
existing = contact_service.get_by_id(db, link_contact_id)
if existing:
from schemas import ContactUpdate
contact_service.update(
db, existing,
ContactUpdate(telegram_id=payload["sender_telegram_id"])
)
contact = existing
else:
contact = contact_service.get_or_create_by_telegram(
db,
payload["sender_telegram_id"],
name=payload.get("sender_name", "Unknown"),
)
else:
contact = contact_service.get_or_create_by_telegram(
db,
payload["sender_telegram_id"],
name=payload.get("sender_name", "Unknown"),
)
channel_conv_id = payload["chat_id"]
else:
contact = contact_service.get_or_create_by_phone(
db,
payload.get("sender_phone", "unknown"),
name=payload.get("sender_name"),
)
channel_conv_id = payload.get("chat_id", payload.get("sender_phone", ""))
conv = conversation_service.get_or_create(
db, channel, channel_conv_id, contact, title=contact.name
)
# Duplikat-Prüfung via channel_message_id
channel_msg_id = payload.get("channel_message_id")
if channel_msg_id:
existing = (
db.query(Message)
.filter(
Message.channel == channel,
Message.channel_message_id == channel_msg_id,
)
.first()
)
if existing:
return
msg = Message(
conversation_id=conv.id,
sender_id=contact.id,
channel=channel,
channel_message_id=channel_msg_id,
direction="inbound",
text=payload.get("text", ""),
status="delivered",
delivered_at=datetime.utcnow(),
reply_to_id=payload.get("reply_to_id"),
)
db.add(msg)
conv.last_message_at = datetime.utcnow()
db.commit()
db.refresh(msg)
logger.info("[%s] Inbound from %s: %s", channel, contact.name, msg.text[:50])
await _notify_callbacks(msg)
except Exception as exc:
logger.error("handle_inbound error: %s", exc)
db.rollback()
finally:
db.close()
async def _notify_callbacks(msg: Message) -> None:
for cb in _new_message_callbacks:
try:
await cb(msg)
except Exception as exc:
logger.warning("Message callback error: %s", exc)
# ── Hilfsfunktionen ────────────────────────────────────────────────────────────
def get_messages(
db: Session, conversation_id: str, limit: int = 50, offset: int = 0
) -> list[Message]:
return conversation_service.get_messages(db, conversation_id, limit, offset)