Spaces:
Sleeping
Sleeping
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| import jwt | |
| from passlib.context import CryptContext | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordBearer | |
| from typing import Any | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/users/login") | |
| def hash_password(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def verify_password(password: str, hashed: str) -> bool: | |
| return pwd_context.verify(password, hashed) | |
| def create_access_token(data: dict, secret: str, algorithm: str, expires_minutes: int) -> str: | |
| to_encode = data.copy() | |
| expire = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, secret, algorithm=algorithm) | |
| def decode_token(token: str, secret: str, algorithms: list[str]) -> dict: | |
| return jwt.decode(token, secret, algorithms=algorithms) | |
| async def get_current_user_optional(token: Optional[str] = Depends(oauth2_scheme)): | |
| if not token: | |
| return None | |
| try: | |
| from app.utils.config import settings | |
| payload = decode_token(token, settings.JWT_SECRET, [settings.JWT_ALGORITHM]) | |
| # supabase auth uses UUID subject if you later switch to Supabase JWTs | |
| return payload.get("sub") | |
| except Exception: | |
| return None | |
| async def get_current_user(token: str = Depends(oauth2_scheme)): | |
| from app.utils.config import settings | |
| try: | |
| payload = decode_token(token, settings.JWT_SECRET, [settings.JWT_ALGORITHM]) | |
| user_id = payload.get("sub") # string UUID or int | |
| if user_id is None: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide") | |
| return {"id": user_id} # Minimal user placeholder until Supabase integration | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expiré") | |
| except Exception: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide") | |