- User-Modell (username, password_hash, role admin/user, is_active) - Standard-Admin-Benutzer wird beim ersten Start automatisch angelegt - JWT-Tokens (HS256) für Benutzer-Sessions, konfigurierbare Ablaufzeit - API-Key bleibt für service-to-service-Calls (backward-compatible) - POST /api/v1/auth/login → JWT-Token - GET /api/v1/auth/me → aktueller Benutzer - CRUD /api/v1/users/ → Benutzerverwaltung (nur Admin) - TUI zeigt Login-Screen beim Start; nach Erfolg → MainScreen - Passwort-Hashing mit bcrypt (python-jose für JWT)
77 lines
2.3 KiB
Python
77 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import bcrypt
|
|
from fastapi import Depends, HTTPException, Security, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError, jwt
|
|
|
|
from config import settings
|
|
|
|
_bearer = HTTPBearer(auto_error=False)
|
|
|
|
ALGORITHM = "HS256"
|
|
|
|
|
|
def hash_password(plain: str) -> str:
|
|
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
try:
|
|
return bcrypt.checkpw(plain.encode(), hashed.encode())
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def create_access_token(data: dict[str, Any]) -> str:
|
|
payload = data.copy()
|
|
expire = datetime.now(timezone.utc) + timedelta(hours=settings.token_expire_hours)
|
|
payload["exp"] = expire
|
|
return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM)
|
|
|
|
|
|
def require_auth(
|
|
credentials: HTTPAuthorizationCredentials | None = Security(_bearer),
|
|
) -> dict[str, Any]:
|
|
"""Akzeptiert API-Key (service-to-service) ODER JWT-Token (Benutzer-Login)."""
|
|
if not credentials:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentifizierung erforderlich",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
token = credentials.credentials
|
|
|
|
# Service-API-Key-Prüfung (backward-compatible)
|
|
if token == settings.api_key:
|
|
return {"type": "api_key", "username": "system", "role": "admin"}
|
|
|
|
# JWT-Prüfung
|
|
try:
|
|
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
|
|
return {
|
|
"type": "user",
|
|
"user_id": payload["sub"],
|
|
"username": payload["username"],
|
|
"role": payload["role"],
|
|
}
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Ungültiger oder abgelaufener Token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
def require_admin(principal: dict = Depends(require_auth)) -> dict:
|
|
if principal.get("role") != "admin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin-Rechte erforderlich")
|
|
return principal
|
|
|
|
|
|
# Alias für Abwärtskompatibilität
|
|
require_api_key = require_auth
|