|
|
""" |
|
|
Authentication and Security for API Endpoints |
|
|
""" |
|
|
|
|
|
from fastapi import Security, HTTPException, status, Request |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
from config import config |
|
|
|
|
|
security = HTTPBearer(auto_error=False) |
|
|
|
|
|
|
|
|
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): |
|
|
"""Verify API token""" |
|
|
|
|
|
if not config.API_TOKENS: |
|
|
return None |
|
|
|
|
|
|
|
|
if not credentials: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Authentication required" |
|
|
) |
|
|
|
|
|
if credentials.credentials not in config.API_TOKENS: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid authentication token" |
|
|
) |
|
|
|
|
|
return credentials.credentials |
|
|
|
|
|
|
|
|
async def verify_ip(request: Request): |
|
|
"""Verify IP whitelist""" |
|
|
if not config.ALLOWED_IPS: |
|
|
|
|
|
return True |
|
|
|
|
|
client_ip = request.client.host |
|
|
if client_ip not in config.ALLOWED_IPS: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_403_FORBIDDEN, |
|
|
detail="IP not whitelisted" |
|
|
) |
|
|
|
|
|
return True |
|
|
|