feat: Multi-User-Unterstützung mit JWT-Authentifizierung

- User-Modell (username, password_hash, role admin/user, is_active)
- Standard-Admin-Benutzer wird beim ersten Start automatisch angelegt
- JWT-Tokens (HS256) für Benutzer-Sessions, konfigurierbare Ablaufzeit
- API-Key bleibt für service-to-service-Calls (backward-compatible)
- POST /api/v1/auth/login → JWT-Token
- GET  /api/v1/auth/me   → aktueller Benutzer
- CRUD /api/v1/users/    → Benutzerverwaltung (nur Admin)
- TUI zeigt Login-Screen beim Start; nach Erfolg → MainScreen
- Passwort-Hashing mit bcrypt (python-jose für JWT)
This commit is contained in:
2026-03-04 20:55:13 +01:00
parent 0e2a8a6bc0
commit 6eb27a62b1
14 changed files with 421 additions and 10 deletions

View File

@@ -1,18 +1,76 @@
from fastapi import HTTPException, Security, status
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
import bcrypt
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from config import settings
_bearer = HTTPBearer(auto_error=False)
ALGORITHM = "HS256"
def require_api_key(
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode(), hashed.encode())
except Exception:
return False
def create_access_token(data: dict[str, Any]) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + timedelta(hours=settings.token_expire_hours)
payload["exp"] = expire
return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM)
def require_auth(
credentials: HTTPAuthorizationCredentials | None = Security(_bearer),
) -> str:
if not credentials or credentials.credentials != settings.api_key:
) -> dict[str, Any]:
"""Akzeptiert API-Key (service-to-service) ODER JWT-Token (Benutzer-Login)."""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key",
detail="Authentifizierung erforderlich",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
token = credentials.credentials
# Service-API-Key-Prüfung (backward-compatible)
if token == settings.api_key:
return {"type": "api_key", "username": "system", "role": "admin"}
# JWT-Prüfung
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
return {
"type": "user",
"user_id": payload["sub"],
"username": payload["username"],
"role": payload["role"],
}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Ungültiger oder abgelaufener Token",
headers={"WWW-Authenticate": "Bearer"},
)
def require_admin(principal: dict = Depends(require_auth)) -> dict:
if principal.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin-Rechte erforderlich")
return principal
# Alias für Abwärtskompatibilität
require_api_key = require_auth