353 lines
14 KiB
Python
353 lines
14 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
from typing import TYPE_CHECKING, Any
|
||
|
||
from textual.app import ComposeResult
|
||
from textual.binding import Binding
|
||
from textual.containers import Horizontal, Vertical
|
||
from textual.screen import ModalScreen, Screen
|
||
from textual.widgets import Button, DataTable, Footer, Header, Input, RichLog, Static
|
||
from textual import work
|
||
|
||
if TYPE_CHECKING:
|
||
from tui.api_client import MCMApiClient
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
CHANNEL_ICON = {"telegram": "✈", "whatsapp": "📱", "sms": "✉"}
|
||
CHANNEL_COLOR = {"telegram": "cyan", "whatsapp": "green", "sms": "yellow"}
|
||
|
||
|
||
class MainScreen(Screen):
|
||
BINDINGS = [
|
||
Binding("n", "new_message", "Neu"),
|
||
Binding("r", "refresh", "Aktualisieren"),
|
||
Binding("t", "telegram_qr", "Telegram QR"),
|
||
Binding("d", "delete_conv", "Chat löschen"),
|
||
Binding("q", "quit_app", "Beenden"),
|
||
]
|
||
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self._current_conv_id: str | None = None
|
||
self._current_conv: dict[str, Any] | None = None
|
||
self._conv_id_map: dict[int, str] = {}
|
||
|
||
@property
|
||
def _api(self) -> "MCMApiClient":
|
||
return self.app._api_client # type: ignore[attr-defined]
|
||
|
||
# ── Layout ─────────────────────────────────────────────────────────────────
|
||
|
||
def compose(self) -> ComposeResult:
|
||
yield Header(show_clock=True)
|
||
with Horizontal(id="main-container"):
|
||
with Vertical(id="sidebar"):
|
||
yield Static("Konversationen", id="sidebar-title")
|
||
yield DataTable(id="conv-table", cursor_type="row", show_header=True)
|
||
with Vertical(id="chat-area"):
|
||
yield Static("Kein Chat geöffnet", id="chat-header")
|
||
yield RichLog(id="message-log", highlight=True, markup=True, wrap=True)
|
||
with Horizontal(id="input-bar"):
|
||
yield Input(
|
||
placeholder="Nachricht eingeben… (Enter = Senden)",
|
||
id="msg-input",
|
||
)
|
||
yield Button("Senden", variant="primary", id="send-btn")
|
||
yield Footer()
|
||
|
||
async def on_mount(self) -> None:
|
||
self._setup_table()
|
||
await self._load_conversations()
|
||
self._poll_loop()
|
||
|
||
# ── Konversationsliste ─────────────────────────────────────────────────────
|
||
|
||
def _setup_table(self) -> None:
|
||
table = self.query_one("#conv-table", DataTable)
|
||
table.add_column("K", width=2)
|
||
table.add_column("Name", width=16)
|
||
table.add_column("Letzte Nachricht", width=22)
|
||
table.add_column("🔔", width=3)
|
||
|
||
async def _load_conversations(self) -> None:
|
||
try:
|
||
convs: list[dict] = await self._api.get_conversations()
|
||
except Exception as exc:
|
||
logger.warning("Konversationen laden fehlgeschlagen: %s", exc)
|
||
return
|
||
|
||
table = self.query_one("#conv-table", DataTable)
|
||
table.clear()
|
||
self._conv_id_map.clear()
|
||
|
||
for idx, conv in enumerate(convs):
|
||
last_msg = conv.get("last_message") or {}
|
||
last_text = last_msg.get("text", "") or ""
|
||
if len(last_text) > 20:
|
||
last_text = last_text[:20] + "…"
|
||
icon = CHANNEL_ICON.get(conv.get("channel", ""), "?")
|
||
unread = conv.get("unread_count", 0)
|
||
badge = str(unread) if unread else ""
|
||
table.add_row(icon, conv.get("title") or conv["id"][:8], last_text, badge)
|
||
self._conv_id_map[idx] = conv["id"]
|
||
|
||
# ── Nachrichten ────────────────────────────────────────────────────────────
|
||
|
||
async def _load_messages(self, conv_id: str) -> None:
|
||
try:
|
||
conv = await self._api.get_conversation(conv_id)
|
||
msgs: list[dict] = await self._api.get_messages(conv_id, limit=100)
|
||
except Exception as exc:
|
||
logger.warning("Nachrichten laden fehlgeschlagen: %s", exc)
|
||
return
|
||
|
||
if not conv:
|
||
return
|
||
|
||
channel = conv.get("channel", "")
|
||
icon = CHANNEL_ICON.get(channel, "?")
|
||
color = CHANNEL_COLOR.get(channel, "white")
|
||
self.query_one("#chat-header", Static).update(
|
||
f"{icon} [{color}]{conv.get('title') or conv_id[:8]}[/{color}] [{channel}]"
|
||
)
|
||
|
||
log = self.query_one("#message-log", RichLog)
|
||
log.clear()
|
||
for msg in msgs:
|
||
self._render_message(log, msg, color)
|
||
|
||
for row_idx, cid in self._conv_id_map.items():
|
||
if cid == conv_id:
|
||
self.query_one("#conv-table", DataTable).update_cell_at((row_idx, 3), "")
|
||
break
|
||
|
||
async def _load_messages_silently(self, conv_id: str) -> None:
|
||
"""Nachrichten neu laden ohne Header-Update (fuer Poll-Refresh)."""
|
||
try:
|
||
msgs: list[dict] = await self._api.get_messages(conv_id, limit=100)
|
||
except Exception:
|
||
return
|
||
conv = self._current_conv or {}
|
||
channel = conv.get("channel", "")
|
||
color = CHANNEL_COLOR.get(channel, "white")
|
||
log = self.query_one("#message-log", RichLog)
|
||
log.clear()
|
||
for msg in msgs:
|
||
self._render_message(log, msg, color)
|
||
|
||
def _render_message(self, log: RichLog, msg: dict, channel_color: str) -> None:
|
||
created = msg.get("created_at", "")
|
||
ts = created[11:16] if len(created) >= 16 else "??"
|
||
direction = msg.get("direction", "outbound")
|
||
direction_prefix = "▶ " if direction == "outbound" else "◀ "
|
||
status_suffix = " ✗" if msg.get("status") == "failed" else ""
|
||
text = msg.get("text", "").replace("[", "\\[")
|
||
if direction == "outbound":
|
||
log.write(f"[dim]{ts}[/dim] [dim]{direction_prefix}{text}{status_suffix}[/dim]")
|
||
else:
|
||
log.write(f"[dim]{ts}[/dim] {direction_prefix}{text}{status_suffix}")
|
||
|
||
# ── Background-Polling ─────────────────────────────────────────────────────
|
||
|
||
@work(exclusive=True)
|
||
async def _poll_loop(self) -> None:
|
||
"""Alle 5s Konversationen und ggf. aktiven Chat aktualisieren."""
|
||
while True:
|
||
await asyncio.sleep(5)
|
||
await self._load_conversations()
|
||
if self._current_conv_id:
|
||
await self._load_messages_silently(self._current_conv_id)
|
||
|
||
# ── Events ─────────────────────────────────────────────────────────────────
|
||
|
||
async def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
|
||
row_idx = event.cursor_row
|
||
conv_id = self._conv_id_map.get(row_idx)
|
||
if conv_id:
|
||
self._current_conv_id = conv_id
|
||
try:
|
||
self._current_conv = await self._api.get_conversation(conv_id)
|
||
except Exception:
|
||
self._current_conv = None
|
||
await self._load_messages(conv_id)
|
||
self.query_one("#msg-input", Input).focus()
|
||
|
||
def on_input_submitted(self, event: Input.Submitted) -> None:
|
||
if event.input.id == "msg-input":
|
||
self._send_current()
|
||
|
||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||
if event.button.id == "send-btn":
|
||
self._send_current()
|
||
|
||
def _send_current(self) -> None:
|
||
inp = self.query_one("#msg-input", Input)
|
||
text = inp.value.strip()
|
||
if not text or not self._current_conv_id:
|
||
return
|
||
inp.clear()
|
||
asyncio.create_task(self._send_async(text))
|
||
|
||
async def _send_async(self, text: str) -> None:
|
||
conv = self._current_conv
|
||
if not conv:
|
||
return
|
||
channel = conv.get("channel", "")
|
||
chat_id = conv.get("channel_conversation_id", "")
|
||
try:
|
||
await self._api.send_message(
|
||
channel=channel,
|
||
text=text,
|
||
recipient_phone=chat_id if channel != "telegram" else None,
|
||
recipient_telegram_id=chat_id if channel == "telegram" else None,
|
||
)
|
||
await self._load_messages(self._current_conv_id)
|
||
await self._load_conversations()
|
||
except Exception as exc:
|
||
logger.error("Senden fehlgeschlagen: %s", exc)
|
||
|
||
# ── Actions ────────────────────────────────────────────────────────────────
|
||
|
||
def action_new_message(self) -> None:
|
||
from tui.screens.compose_screen import ComposeScreen
|
||
self.app.push_screen(ComposeScreen(), self._compose_callback)
|
||
|
||
def _compose_callback(self, result: dict | None) -> None:
|
||
if not result:
|
||
return
|
||
asyncio.create_task(self._send_compose(result))
|
||
|
||
async def _send_compose(self, result: dict) -> None:
|
||
channel = result["channel"]
|
||
recipient = result["recipient"]
|
||
try:
|
||
await self._api.send_message(
|
||
channel=channel,
|
||
text=result["text"],
|
||
recipient_phone=recipient if channel != "telegram" else None,
|
||
recipient_telegram_id=recipient if channel == "telegram" else None,
|
||
)
|
||
await self._load_conversations()
|
||
except Exception as exc:
|
||
logger.error("Senden fehlgeschlagen: %s", exc)
|
||
|
||
def action_refresh(self) -> None:
|
||
asyncio.create_task(self._do_refresh())
|
||
|
||
async def _do_refresh(self) -> None:
|
||
await self._load_conversations()
|
||
if self._current_conv_id:
|
||
await self._load_messages(self._current_conv_id)
|
||
|
||
def action_delete_conv(self) -> None:
|
||
if not self._current_conv_id:
|
||
self.notify("Kein Chat ausgewählt.", severity="warning")
|
||
return
|
||
title = (self._current_conv or {}).get("title") or self._current_conv_id[:8]
|
||
self.app.push_screen(
|
||
_ConfirmScreen(f"Chat löschen?\n\n{title}\n\nAlle Nachrichten werden entfernt."),
|
||
self._delete_conv_callback,
|
||
)
|
||
|
||
def _delete_conv_callback(self, confirmed: bool) -> None:
|
||
if confirmed:
|
||
asyncio.create_task(self._do_delete_conv())
|
||
|
||
async def _do_delete_conv(self) -> None:
|
||
conv_id = self._current_conv_id
|
||
if not conv_id:
|
||
return
|
||
try:
|
||
await self._api.delete_conversation(conv_id)
|
||
self._current_conv_id = None
|
||
self._current_conv = None
|
||
self.query_one("#chat-header", Static).update("Kein Chat geöffnet")
|
||
self.query_one("#message-log", RichLog).clear()
|
||
await self._load_conversations()
|
||
self.notify("Chat gelöscht.")
|
||
except Exception as exc:
|
||
self.notify(f"Löschen fehlgeschlagen: {exc}", severity="error")
|
||
|
||
def action_telegram_qr(self) -> None:
|
||
asyncio.create_task(self._generate_telegram_qr())
|
||
|
||
async def _generate_telegram_qr(self) -> None:
|
||
if not self._current_conv:
|
||
self.notify("Bitte erst eine Konversation auswählen.", severity="warning")
|
||
return
|
||
conv = self._current_conv
|
||
if conv.get("channel") != "telegram":
|
||
self.notify("QR-Code nur für Telegram-Konversationen.", severity="warning")
|
||
return
|
||
contact_id = conv.get("contact_id")
|
||
if not contact_id:
|
||
try:
|
||
details = await self._api.get_conversation(conv["id"])
|
||
contact_id = (details or {}).get("contact_id")
|
||
except Exception:
|
||
pass
|
||
if not contact_id:
|
||
self.notify("Kein Kontakt zur Konversation gefunden.", severity="error")
|
||
return
|
||
try:
|
||
import base64
|
||
import tempfile
|
||
import webbrowser
|
||
from config import settings
|
||
png_bytes = await self._api.get_telegram_qr(contact_id)
|
||
bot = settings.telegram_bot_username.lstrip("@")
|
||
invite_url = f"https://t.me/{bot}?start={contact_id}"
|
||
b64 = base64.b64encode(png_bytes).decode()
|
||
contact_name = conv.get("title") or contact_id
|
||
html = f"""<!DOCTYPE html>
|
||
<html lang="de">
|
||
<head><meta charset="utf-8"><title>Telegram QR – {contact_name}</title>
|
||
<style>
|
||
body {{font-family:sans-serif;text-align:center;padding:2rem;background:#1a1a2e;color:#eee}}
|
||
img {{width:300px;height:300px;border:8px solid #fff;border-radius:12px;margin:1rem}}
|
||
a {{color:#64b5f6;word-break:break-all}}
|
||
h2 {{margin-bottom:0}}
|
||
p {{margin:.5rem 0}}
|
||
</style></head>
|
||
<body>
|
||
<h2>Telegram Invite</h2>
|
||
<p>{contact_name}</p>
|
||
<img src="data:image/png;base64,{b64}" alt="QR-Code">
|
||
<p><a href="{invite_url}">{invite_url}</a></p>
|
||
<p style="font-size:.85rem;color:#aaa">Kunde scannt QR-Code → Telegram öffnet Bot → Verbindung hergestellt</p>
|
||
</body></html>"""
|
||
with tempfile.NamedTemporaryFile(
|
||
delete=False, suffix=".html", mode="w", encoding="utf-8"
|
||
) as tmp:
|
||
tmp.write(html)
|
||
tmp_path = tmp.name
|
||
webbrowser.open(f"file://{tmp_path}")
|
||
self.notify(f"QR-Code im Browser geöffnet | {invite_url}", timeout=8)
|
||
except Exception as exc:
|
||
self.notify(f"QR-Code Fehler: {exc}", severity="error")
|
||
|
||
def action_quit_app(self) -> None:
|
||
self.app.exit()
|
||
|
||
|
||
class _ConfirmScreen(ModalScreen[bool]):
|
||
"""Einfacher Ja/Nein-Dialog."""
|
||
|
||
def __init__(self, message: str) -> None:
|
||
super().__init__()
|
||
self._message = message
|
||
|
||
def compose(self) -> ComposeResult:
|
||
from textual.widgets import Label
|
||
with Vertical(id="compose-dialog"):
|
||
yield Label(self._message, id="confirm-msg")
|
||
with Horizontal(id="compose-buttons"):
|
||
yield Button("Abbrechen", variant="default", id="btn-no")
|
||
yield Button("Löschen", variant="error", id="btn-yes")
|
||
|
||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||
self.dismiss(event.button.id == "btn-yes")
|