Really-amin's picture
Upload 301 files
e4e4574 verified
"""
Authentication and Authorization System
Implements JWT-based authentication for production deployments
"""
import os
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import hashlib
import logging
from functools import wraps
try:
import jwt
JWT_AVAILABLE = True
except ImportError:
JWT_AVAILABLE = False
logging.warning("PyJWT not installed. Authentication disabled. Install with: pip install PyJWT")
logger = logging.getLogger(__name__)
# Configuration
SECRET_KEY = os.getenv('SECRET_KEY', secrets.token_urlsafe(32))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', '60'))
ENABLE_AUTH = os.getenv('ENABLE_AUTH', 'false').lower() == 'true'
class AuthManager:
"""
Authentication manager for API endpoints and dashboard access
Supports JWT tokens and basic API key authentication
"""
def __init__(self):
self.users_db: Dict[str, str] = {} # username -> hashed_password
self.api_keys_db: Dict[str, Dict[str, Any]] = {} # api_key -> metadata
self._load_credentials()
def _load_credentials(self):
"""Load credentials from environment variables"""
# Load default admin user
admin_user = os.getenv('ADMIN_USERNAME', 'admin')
admin_pass = os.getenv('ADMIN_PASSWORD')
if admin_pass:
self.users_db[admin_user] = self._hash_password(admin_pass)
logger.info(f"Loaded admin user: {admin_user}")
# Load API keys from environment
api_keys_str = os.getenv('API_KEYS', '')
if api_keys_str:
for key in api_keys_str.split(','):
key = key.strip()
if key:
self.api_keys_db[key] = {
'created_at': datetime.utcnow(),
'name': 'env_key',
'active': True
}
logger.info(f"Loaded {len(self.api_keys_db)} API keys")
@staticmethod
def _hash_password(password: str) -> str:
"""Hash password using SHA-256"""
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(self, username: str, password: str) -> bool:
"""
Verify username and password
Args:
username: Username
password: Plain text password
Returns:
True if valid, False otherwise
"""
if username not in self.users_db:
return False
hashed = self._hash_password(password)
return secrets.compare_digest(self.users_db[username], hashed)
def create_access_token(
self,
username: str,
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create JWT access token
Args:
username: Username
expires_delta: Token expiration time
Returns:
JWT token string
"""
if not JWT_AVAILABLE:
raise RuntimeError("PyJWT not installed")
if expires_delta is None:
expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
expire = datetime.utcnow() + expires_delta
payload = {
'sub': username,
'exp': expire,
'iat': datetime.utcnow()
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return token
def verify_token(self, token: str) -> Optional[str]:
"""
Verify JWT token and extract username
Args:
token: JWT token string
Returns:
Username if valid, None otherwise
"""
if not JWT_AVAILABLE:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
return username
except jwt.ExpiredSignatureError:
logger.warning("Token expired")
return None
except jwt.JWTError as e:
logger.warning(f"Invalid token: {e}")
return None
def verify_api_key(self, api_key: str) -> bool:
"""
Verify API key
Args:
api_key: API key string
Returns:
True if valid and active, False otherwise
"""
if api_key not in self.api_keys_db:
return False
key_data = self.api_keys_db[api_key]
return key_data.get('active', False)
def create_api_key(self, name: str) -> str:
"""
Create new API key
Args:
name: Descriptive name for the key
Returns:
Generated API key
"""
api_key = secrets.token_urlsafe(32)
self.api_keys_db[api_key] = {
'created_at': datetime.utcnow(),
'name': name,
'active': True,
'usage_count': 0
}
logger.info(f"Created API key: {name}")
return api_key
def revoke_api_key(self, api_key: str) -> bool:
"""
Revoke API key
Args:
api_key: API key to revoke
Returns:
True if revoked, False if not found
"""
if api_key in self.api_keys_db:
self.api_keys_db[api_key]['active'] = False
logger.info(f"Revoked API key: {self.api_keys_db[api_key]['name']}")
return True
return False
def track_usage(self, api_key: str):
"""Track API key usage"""
if api_key in self.api_keys_db:
self.api_keys_db[api_key]['usage_count'] = \
self.api_keys_db[api_key].get('usage_count', 0) + 1
# Global auth manager instance
auth_manager = AuthManager()
# ==================== DECORATORS ====================
def require_auth(func):
"""
Decorator to require authentication for endpoints
Checks for JWT token in Authorization header or API key in X-API-Key header
"""
@wraps(func)
async def wrapper(*args, **kwargs):
if not ENABLE_AUTH:
# Authentication disabled, allow all requests
return await func(*args, **kwargs)
# Try to get token from request
# This is a placeholder - actual implementation depends on framework (FastAPI, Flask, etc.)
# For FastAPI:
# from fastapi import Header, HTTPException
# authorization: Optional[str] = Header(None)
# api_key: Optional[str] = Header(None, alias="X-API-Key")
# For now, this is a template
raise NotImplementedError("Integrate with your web framework")
return wrapper
def require_api_key(func):
"""Decorator to require API key authentication"""
@wraps(func)
async def wrapper(*args, **kwargs):
if not ENABLE_AUTH:
return await func(*args, **kwargs)
# Template for API key verification
raise NotImplementedError("Integrate with your web framework")
return wrapper
# ==================== HELPER FUNCTIONS ====================
def authenticate_user(username: str, password: str) -> Optional[str]:
"""
Authenticate user and return JWT token
Args:
username: Username
password: Password
Returns:
JWT token if successful, None otherwise
"""
if not ENABLE_AUTH:
logger.warning("Authentication disabled")
return None
if auth_manager.verify_password(username, password):
return auth_manager.create_access_token(username)
return None
def verify_request_auth(
authorization: Optional[str] = None,
api_key: Optional[str] = None
) -> bool:
"""
Verify request authentication
Args:
authorization: Authorization header (Bearer token)
api_key: X-API-Key header
Returns:
True if authenticated, False otherwise
"""
if not ENABLE_AUTH:
return True
# Check API key first
if api_key and auth_manager.verify_api_key(api_key):
auth_manager.track_usage(api_key)
return True
# Check JWT token
if authorization and authorization.startswith('Bearer '):
token = authorization.split(' ')[1]
username = auth_manager.verify_token(token)
if username:
return True
return False