feat: Multi-User-Unterstützung mit JWT-Authentifizierung

- User-Modell (username, password_hash, role admin/user, is_active)
- Standard-Admin-Benutzer wird beim ersten Start automatisch angelegt
- JWT-Tokens (HS256) für Benutzer-Sessions, konfigurierbare Ablaufzeit
- API-Key bleibt für service-to-service-Calls (backward-compatible)
- POST /api/v1/auth/login → JWT-Token
- GET  /api/v1/auth/me   → aktueller Benutzer
- CRUD /api/v1/users/    → Benutzerverwaltung (nur Admin)
- TUI zeigt Login-Screen beim Start; nach Erfolg → MainScreen
- Passwort-Hashing mit bcrypt (python-jose für JWT)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-04 20:55:13 +01:00
parent 6742242fe0
commit cd26b80d00
14 changed files with 421 additions and 10 deletions

View File

@@ -26,3 +26,11 @@ DATABASE_URL=sqlite:///./mcm.db
# ── TUI Web-Modus (textual serve) ──────────────────────────
# Starten: .venv/bin/python -m textual serve --host 0.0.0.0 --port $WEB_PORT tui_standalone.py
WEB_PORT=8001
# ── Benutzer-Authentifizierung ──────────────────────────────────────────────
# secret_key: mindestens 32 Zeichen, zufällig generieren (z.B. openssl rand -hex 32)
SECRET_KEY=change-this-secret-key-min-32-chars!!
TOKEN_EXPIRE_HOURS=24
# Standard-Admin-Benutzer (wird beim ersten Start angelegt)
DEFAULT_ADMIN_USER=admin
DEFAULT_ADMIN_PASSWORD=admin

View File

@@ -3,7 +3,7 @@ from datetime import datetime
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from api.routes import channels, contacts, conversations, messages
from api.routes import auth, channels, contacts, conversations, messages, users
app = FastAPI(
title="MCM MultiCustomerMessenger API",
@@ -22,6 +22,8 @@ app.add_middleware(
)
# Routen
app.include_router(auth.router, prefix="/api/v1")
app.include_router(users.router, prefix="/api/v1")
app.include_router(messages.router, prefix="/api/v1")
app.include_router(conversations.router, prefix="/api/v1")
app.include_router(contacts.router, prefix="/api/v1")

View File

@@ -1,18 +1,76 @@
from fastapi import HTTPException, Security, status
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
import bcrypt
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from config import settings
_bearer = HTTPBearer(auto_error=False)
ALGORITHM = "HS256"
def require_api_key(
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode(), hashed.encode())
except Exception:
return False
def create_access_token(data: dict[str, Any]) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + timedelta(hours=settings.token_expire_hours)
payload["exp"] = expire
return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM)
def require_auth(
credentials: HTTPAuthorizationCredentials | None = Security(_bearer),
) -> str:
if not credentials or credentials.credentials != settings.api_key:
) -> dict[str, Any]:
"""Akzeptiert API-Key (service-to-service) ODER JWT-Token (Benutzer-Login)."""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key",
detail="Authentifizierung erforderlich",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
token = credentials.credentials
# Service-API-Key-Prüfung (backward-compatible)
if token == settings.api_key:
return {"type": "api_key", "username": "system", "role": "admin"}
# JWT-Prüfung
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
return {
"type": "user",
"user_id": payload["sub"],
"username": payload["username"],
"role": payload["role"],
}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Ungültiger oder abgelaufener Token",
headers={"WWW-Authenticate": "Bearer"},
)
def require_admin(principal: dict = Depends(require_auth)) -> dict:
if principal.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin-Rechte erforderlich")
return principal
# Alias für Abwärtskompatibilität
require_api_key = require_auth

32
api/routes/auth.py Normal file
View File

@@ -0,0 +1,32 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from api.auth import create_access_token, require_auth, verify_password
from db.database import get_db
from db.models import User
from schemas import LoginRequest, TokenResponse
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login", response_model=TokenResponse)
def login(req: LoginRequest, db: Session = Depends(get_db)):
user = (
db.query(User)
.filter(User.username == req.username, User.is_active == True) # noqa: E712
.first()
)
if not user or not verify_password(req.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Ungültiger Benutzername oder Passwort",
)
token = create_access_token(
{"sub": user.id, "username": user.username, "role": user.role}
)
return TokenResponse(access_token=token, username=user.username, role=user.role)
@router.get("/me", response_model=dict)
def me(principal: dict = Depends(require_auth)):
return {"username": principal["username"], "role": principal["role"]}

82
api/routes/users.py Normal file
View File

@@ -0,0 +1,82 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from api.auth import hash_password, require_admin, require_auth
from db.database import get_db
from db.models import User
from schemas import UserCreate, UserResponse, UserUpdate
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/", response_model=list[UserResponse])
def list_users(
db: Session = Depends(get_db),
_: dict = Depends(require_admin),
):
return db.query(User).order_by(User.created_at).all()
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
data: UserCreate,
db: Session = Depends(get_db),
_: dict = Depends(require_admin),
):
if db.query(User).filter(User.username == data.username).first():
raise HTTPException(status_code=409, detail="Benutzername bereits vergeben")
user = User(
username=data.username,
password_hash=hash_password(data.password),
role=data.role,
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.put("/{user_id}", response_model=UserResponse)
def update_user(
user_id: str,
data: UserUpdate,
db: Session = Depends(get_db),
principal: dict = Depends(require_auth),
):
# Admins dürfen alle ändern; Benutzer nur sich selbst (nur Passwort)
if principal["role"] != "admin" and principal.get("user_id") != user_id:
raise HTTPException(status_code=403, detail="Keine Berechtigung")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
if data.password is not None:
user.password_hash = hash_password(data.password)
if data.role is not None and principal["role"] == "admin":
user.role = data.role
if data.is_active is not None and principal["role"] == "admin":
user.is_active = data.is_active
db.commit()
db.refresh(user)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
user_id: str,
db: Session = Depends(get_db),
principal: dict = Depends(require_admin),
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
# Letzten Admin nicht löschen
if user.role == "admin":
admin_count = db.query(User).filter(User.role == "admin", User.is_active == True).count() # noqa
if admin_count <= 1:
raise HTTPException(status_code=400, detail="Letzten Admin kann nicht gelöscht werden")
db.delete(user)
db.commit()

View File

@@ -29,6 +29,12 @@ class Settings(BaseSettings):
# TUI Web-Modus
web_port: int = 8001
# Benutzer-Authentifizierung
secret_key: str = "change-this-secret-key-min-32-chars!!"
token_expire_hours: int = 24
default_admin_user: str = "admin"
default_admin_password: str = "admin"
@property
def telegram_enabled(self) -> bool:
return bool(self.telegram_token)

View File

@@ -36,3 +36,27 @@ def get_db():
def init_db() -> None:
from db import models # noqa: F401 Modelle müssen importiert sein
Base.metadata.create_all(bind=engine)
_create_default_admin()
def _create_default_admin() -> None:
"""Legt beim ersten Start einen Admin-Benutzer an, falls keine User existieren."""
import bcrypt
from db.models import User
db = SessionLocal()
try:
if not db.query(User).first():
pw_hash = bcrypt.hashpw(
settings.default_admin_password.encode(), bcrypt.gensalt()
).decode()
admin = User(
username=settings.default_admin_user,
password_hash=pw_hash,
role="admin",
is_active=True,
)
db.add(admin)
db.commit()
finally:
db.close()

View File

@@ -23,6 +23,23 @@ def _uuid() -> str:
return str(uuid.uuid4())
# ── User ───────────────────────────────────────────────────────────────────────
class User(Base):
__tablename__ = "users"
id = Column(String(36), primary_key=True, default=_uuid)
username = Column(String(64), nullable=False, unique=True, index=True)
password_hash = Column(String(255), nullable=False)
role = Column(
Enum("admin", "user", name="user_role"),
nullable=False,
default="user",
)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime, default=func.now(), nullable=False)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# ── Many-to-many: Conversation ↔ Contact ──────────────────────────────────────
conversation_participants = Table(
"conversation_participants",

View File

@@ -12,3 +12,5 @@ apscheduler>=3.10.4
textual>=0.75.0
python-dotenv>=1.0.0
httpx>=0.27.0
python-jose[cryptography]>=3.3.0
bcrypt>=4.0.0

View File

@@ -126,3 +126,43 @@ class SystemStatusResponse(BaseModel):
channels: list[ChannelStatusResponse]
database: bool
timestamp: datetime
# ── Auth / User ─────────────────────────────────────────────────────────────
class UserRole(str, Enum):
admin = "admin"
user = "user"
class LoginRequest(BaseModel):
username: str = Field(..., min_length=1, max_length=64)
password: str = Field(..., min_length=1)
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
username: str
role: UserRole
class UserCreate(BaseModel):
username: str = Field(..., min_length=1, max_length=64)
password: str = Field(..., min_length=6)
role: UserRole = UserRole.user
class UserUpdate(BaseModel):
password: Optional[str] = Field(None, min_length=6)
role: Optional[UserRole] = None
is_active: Optional[bool] = None
class UserResponse(BaseModel):
model_config = {"from_attributes": True}
id: str
username: str
role: UserRole
is_active: bool
created_at: datetime

View File

@@ -22,7 +22,38 @@ class MCMApiClient:
def __init__(self) -> None:
self._base = f"http://127.0.0.1:{settings.port}/api/v1"
self._headers = {"Authorization": f"Bearer {settings.api_key}"}
self._token: str | None = None
@property
def _headers(self) -> dict[str, str]:
if self._token:
return {"Authorization": f"Bearer {self._token}"}
return {}
# ── Authentifizierung ──────────────────────────────────────────────────────
async def login(self, username: str, password: str) -> bool:
"""Meldet den Benutzer an und speichert den JWT-Token. Gibt True bei Erfolg zurück."""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(
self._base + "/auth/login",
json={"username": username, "password": password},
)
if resp.status_code == 200:
self._token = resp.json()["access_token"]
return True
return False
except Exception as exc:
logger.warning("Login fehlgeschlagen: %s", exc)
return False
def logout(self) -> None:
self._token = None
@property
def is_authenticated(self) -> bool:
return self._token is not None
# ── Konversationen ─────────────────────────────────────────────────────────

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from textual.app import App
from tui.api_client import MCMApiClient
from tui.screens.login_screen import LoginScreen
from tui.screens.main_screen import MainScreen
@@ -11,11 +12,11 @@ class MCMApp(App):
TITLE = "MCM MultiCustomerMessenger"
CSS_PATH = "styles.tcss"
SCREENS = {"main": MainScreen}
SCREENS = {"login": LoginScreen, "main": MainScreen}
def __init__(self) -> None:
super().__init__()
self._api_client = MCMApiClient()
def on_mount(self) -> None:
self.push_screen("main")
self.push_screen("login")

View File

@@ -0,0 +1,71 @@
from __future__ import annotations
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Vertical
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Input, Label, Static
class LoginScreen(Screen):
"""Anmeldebildschirm erscheint beim Start der TUI."""
BINDINGS = [Binding("ctrl+c", "quit_app", "Beenden")]
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Vertical(id="login-container"):
yield Static("Anmeldung", id="login-title")
yield Label("Benutzername:")
yield Input(placeholder="admin", id="username-input")
yield Label("Passwort:")
yield Input(placeholder="", password=True, id="password-input")
yield Static("", id="login-error")
yield Button("Anmelden", variant="primary", id="login-btn")
yield Footer()
def on_mount(self) -> None:
self.query_one("#username-input", Input).focus()
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "username-input":
self.query_one("#password-input", Input).focus()
elif event.input.id == "password-input":
self._do_login()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "login-btn":
self._do_login()
def _do_login(self) -> None:
import asyncio
asyncio.create_task(self._login_async())
async def _login_async(self) -> None:
username = self.query_one("#username-input", Input).value.strip()
password = self.query_one("#password-input", Input).value
if not username or not password:
self.query_one("#login-error", Static).update("Benutzername und Passwort eingeben.")
return
btn = self.query_one("#login-btn", Button)
btn.disabled = True
self.query_one("#login-error", Static).update("")
try:
ok = await self.app._api_client.login(username, password) # type: ignore[attr-defined]
except Exception:
ok = False
btn.disabled = False
if ok:
from tui.screens.main_screen import MainScreen
self.app.switch_screen(MainScreen())
else:
self.query_one("#login-error", Static).update("Ungültige Zugangsdaten oder Server nicht erreichbar.")
self.query_one("#password-input", Input).clear()
self.query_one("#password-input", Input).focus()
def action_quit_app(self) -> None:
self.app.exit()

View File

@@ -126,3 +126,40 @@ Screen {
#compose-buttons Button {
margin-left: 1;
}
/* ── Login-Screen ──────────────────────────────────────────────────────── */
#login-container {
align: center middle;
width: 100%;
height: 1fr;
}
#login-title {
text-style: bold;
text-align: center;
margin-bottom: 1;
color: $accent;
width: 40;
}
#login-container Label {
width: 40;
margin-top: 1;
}
#login-container Input {
width: 40;
}
#login-btn {
width: 40;
margin-top: 1;
}
#login-error {
width: 40;
color: $error;
text-align: center;
height: 1;
}