MultiCustomerMessenger supporting Telegram (python-telegram-bot), WhatsApp (Green API) and SMS (python-gsmmodem-new). REST API with Bearer-token auth, SQLAlchemy models for MariaDB, APScheduler for background polling, and Textual TUI running in same asyncio event-loop.
61 lines
1.9 KiB
Python
61 lines
1.9 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from api.auth import require_api_key
|
|
from db.database import get_db
|
|
from schemas import ContactCreate, ContactResponse, ContactUpdate
|
|
from services import contact_service
|
|
|
|
router = APIRouter(prefix="/contacts", tags=["contacts"])
|
|
|
|
|
|
@router.get("/", response_model=list[ContactResponse])
|
|
def list_contacts(db: Session = Depends(get_db), _: str = Depends(require_api_key)):
|
|
return contact_service.get_all(db)
|
|
|
|
|
|
@router.post("/", response_model=ContactResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_contact(
|
|
data: ContactCreate,
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(require_api_key),
|
|
):
|
|
return contact_service.create(db, data)
|
|
|
|
|
|
@router.get("/{contact_id}", response_model=ContactResponse)
|
|
def get_contact(
|
|
contact_id: str,
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(require_api_key),
|
|
):
|
|
contact = contact_service.get_by_id(db, contact_id)
|
|
if not contact:
|
|
raise HTTPException(status_code=404, detail="Contact not found")
|
|
return contact
|
|
|
|
|
|
@router.put("/{contact_id}", response_model=ContactResponse)
|
|
def update_contact(
|
|
contact_id: str,
|
|
data: ContactUpdate,
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(require_api_key),
|
|
):
|
|
contact = contact_service.get_by_id(db, contact_id)
|
|
if not contact:
|
|
raise HTTPException(status_code=404, detail="Contact not found")
|
|
return contact_service.update(db, contact, data)
|
|
|
|
|
|
@router.delete("/{contact_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_contact(
|
|
contact_id: str,
|
|
db: Session = Depends(get_db),
|
|
_: str = Depends(require_api_key),
|
|
):
|
|
contact = contact_service.get_by_id(db, contact_id)
|
|
if not contact:
|
|
raise HTTPException(status_code=404, detail="Contact not found")
|
|
contact_service.delete(db, contact)
|