from datetime import datetime, timedelta, timezone from typing import Optional import jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from typing import Any pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/users/login") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, hashed: str) -> bool: return pwd_context.verify(password, hashed) def create_access_token(data: dict, secret: str, algorithm: str, expires_minutes: int) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, secret, algorithm=algorithm) def decode_token(token: str, secret: str, algorithms: list[str]) -> dict: return jwt.decode(token, secret, algorithms=algorithms) async def get_current_user_optional(token: Optional[str] = Depends(oauth2_scheme)): if not token: return None try: from app.utils.config import settings payload = decode_token(token, settings.JWT_SECRET, [settings.JWT_ALGORITHM]) # supabase auth uses UUID subject if you later switch to Supabase JWTs return payload.get("sub") except Exception: return None async def get_current_user(token: str = Depends(oauth2_scheme)): from app.utils.config import settings try: payload = decode_token(token, settings.JWT_SECRET, [settings.JWT_ALGORITHM]) user_id = payload.get("sub") # string UUID or int if user_id is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide") return {"id": user_id} # Minimal user placeholder until Supabase integration except jwt.ExpiredSignatureError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expiré") except Exception: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide")