Files
MCM/api/auth.py
itdrui.de 6eb27a62b1 feat: Multi-User-Unterstützung mit JWT-Authentifizierung
- User-Modell (username, password_hash, role admin/user, is_active)
- Standard-Admin-Benutzer wird beim ersten Start automatisch angelegt
- JWT-Tokens (HS256) für Benutzer-Sessions, konfigurierbare Ablaufzeit
- API-Key bleibt für service-to-service-Calls (backward-compatible)
- POST /api/v1/auth/login → JWT-Token
- GET  /api/v1/auth/me   → aktueller Benutzer
- CRUD /api/v1/users/    → Benutzerverwaltung (nur Admin)
- TUI zeigt Login-Screen beim Start; nach Erfolg → MainScreen
- Passwort-Hashing mit bcrypt (python-jose für JWT)
2026-03-04 20:55:13 +01:00

77 lines
2.3 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
import bcrypt
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from config import settings
_bearer = HTTPBearer(auto_error=False)
ALGORITHM = "HS256"
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode(), hashed.encode())
except Exception:
return False
def create_access_token(data: dict[str, Any]) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + timedelta(hours=settings.token_expire_hours)
payload["exp"] = expire
return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM)
def require_auth(
credentials: HTTPAuthorizationCredentials | None = Security(_bearer),
) -> dict[str, Any]:
"""Akzeptiert API-Key (service-to-service) ODER JWT-Token (Benutzer-Login)."""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentifizierung erforderlich",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
# Service-API-Key-Prüfung (backward-compatible)
if token == settings.api_key:
return {"type": "api_key", "username": "system", "role": "admin"}
# JWT-Prüfung
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
return {
"type": "user",
"user_id": payload["sub"],
"username": payload["username"],
"role": payload["role"],
}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Ungültiger oder abgelaufener Token",
headers={"WWW-Authenticate": "Bearer"},
)
def require_admin(principal: dict = Depends(require_auth)) -> dict:
if principal.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin-Rechte erforderlich")
return principal
# Alias für Abwärtskompatibilität
require_api_key = require_auth