- User-Modell (username, password_hash, role admin/user, is_active) - Standard-Admin-Benutzer wird beim ersten Start automatisch angelegt - JWT-Tokens (HS256) für Benutzer-Sessions, konfigurierbare Ablaufzeit - API-Key bleibt für service-to-service-Calls (backward-compatible) - POST /api/v1/auth/login → JWT-Token - GET /api/v1/auth/me → aktueller Benutzer - CRUD /api/v1/users/ → Benutzerverwaltung (nur Admin) - TUI zeigt Login-Screen beim Start; nach Erfolg → MainScreen - Passwort-Hashing mit bcrypt (python-jose für JWT)
83 lines
2.7 KiB
Python
83 lines
2.7 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from api.auth import hash_password, require_admin, require_auth
|
|
from db.database import get_db
|
|
from db.models import User
|
|
from schemas import UserCreate, UserResponse, UserUpdate
|
|
|
|
router = APIRouter(prefix="/users", tags=["users"])
|
|
|
|
|
|
@router.get("/", response_model=list[UserResponse])
|
|
def list_users(
|
|
db: Session = Depends(get_db),
|
|
_: dict = Depends(require_admin),
|
|
):
|
|
return db.query(User).order_by(User.created_at).all()
|
|
|
|
|
|
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_user(
|
|
data: UserCreate,
|
|
db: Session = Depends(get_db),
|
|
_: dict = Depends(require_admin),
|
|
):
|
|
if db.query(User).filter(User.username == data.username).first():
|
|
raise HTTPException(status_code=409, detail="Benutzername bereits vergeben")
|
|
user = User(
|
|
username=data.username,
|
|
password_hash=hash_password(data.password),
|
|
role=data.role,
|
|
is_active=True,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.put("/{user_id}", response_model=UserResponse)
|
|
def update_user(
|
|
user_id: str,
|
|
data: UserUpdate,
|
|
db: Session = Depends(get_db),
|
|
principal: dict = Depends(require_auth),
|
|
):
|
|
# Admins dürfen alle ändern; Benutzer nur sich selbst (nur Passwort)
|
|
if principal["role"] != "admin" and principal.get("user_id") != user_id:
|
|
raise HTTPException(status_code=403, detail="Keine Berechtigung")
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
|
|
|
|
if data.password is not None:
|
|
user.password_hash = hash_password(data.password)
|
|
if data.role is not None and principal["role"] == "admin":
|
|
user.role = data.role
|
|
if data.is_active is not None and principal["role"] == "admin":
|
|
user.is_active = data.is_active
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_user(
|
|
user_id: str,
|
|
db: Session = Depends(get_db),
|
|
principal: dict = Depends(require_admin),
|
|
):
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
|
|
# Letzten Admin nicht löschen
|
|
if user.role == "admin":
|
|
admin_count = db.query(User).filter(User.role == "admin", User.is_active == True).count() # noqa
|
|
if admin_count <= 1:
|
|
raise HTTPException(status_code=400, detail="Letzten Admin kann nicht gelöscht werden")
|
|
db.delete(user)
|
|
db.commit()
|