|
|
""" |
|
|
Authentication and Authorization System |
|
|
Implements JWT-based authentication for production deployments |
|
|
""" |
|
|
|
|
|
import os |
|
|
import secrets |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, Dict, Any |
|
|
import hashlib |
|
|
import logging |
|
|
from functools import wraps |
|
|
|
|
|
try: |
|
|
import jwt |
|
|
JWT_AVAILABLE = True |
|
|
except ImportError: |
|
|
JWT_AVAILABLE = False |
|
|
logging.warning("PyJWT not installed. Authentication disabled. Install with: pip install PyJWT") |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
SECRET_KEY = os.getenv('SECRET_KEY', secrets.token_urlsafe(32)) |
|
|
ALGORITHM = "HS256" |
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', '60')) |
|
|
ENABLE_AUTH = os.getenv('ENABLE_AUTH', 'false').lower() == 'true' |
|
|
|
|
|
|
|
|
class AuthManager: |
|
|
""" |
|
|
Authentication manager for API endpoints and dashboard access |
|
|
Supports JWT tokens and basic API key authentication |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.users_db: Dict[str, str] = {} |
|
|
self.api_keys_db: Dict[str, Dict[str, Any]] = {} |
|
|
self._load_credentials() |
|
|
|
|
|
def _load_credentials(self): |
|
|
"""Load credentials from environment variables""" |
|
|
|
|
|
admin_user = os.getenv('ADMIN_USERNAME', 'admin') |
|
|
admin_pass = os.getenv('ADMIN_PASSWORD') |
|
|
|
|
|
if admin_pass: |
|
|
self.users_db[admin_user] = self._hash_password(admin_pass) |
|
|
logger.info(f"Loaded admin user: {admin_user}") |
|
|
|
|
|
|
|
|
api_keys_str = os.getenv('API_KEYS', '') |
|
|
if api_keys_str: |
|
|
for key in api_keys_str.split(','): |
|
|
key = key.strip() |
|
|
if key: |
|
|
self.api_keys_db[key] = { |
|
|
'created_at': datetime.utcnow(), |
|
|
'name': 'env_key', |
|
|
'active': True |
|
|
} |
|
|
logger.info(f"Loaded {len(self.api_keys_db)} API keys") |
|
|
|
|
|
@staticmethod |
|
|
def _hash_password(password: str) -> str: |
|
|
"""Hash password using SHA-256""" |
|
|
return hashlib.sha256(password.encode()).hexdigest() |
|
|
|
|
|
def verify_password(self, username: str, password: str) -> bool: |
|
|
""" |
|
|
Verify username and password |
|
|
|
|
|
Args: |
|
|
username: Username |
|
|
password: Plain text password |
|
|
|
|
|
Returns: |
|
|
True if valid, False otherwise |
|
|
""" |
|
|
if username not in self.users_db: |
|
|
return False |
|
|
|
|
|
hashed = self._hash_password(password) |
|
|
return secrets.compare_digest(self.users_db[username], hashed) |
|
|
|
|
|
def create_access_token( |
|
|
self, |
|
|
username: str, |
|
|
expires_delta: Optional[timedelta] = None |
|
|
) -> str: |
|
|
""" |
|
|
Create JWT access token |
|
|
|
|
|
Args: |
|
|
username: Username |
|
|
expires_delta: Token expiration time |
|
|
|
|
|
Returns: |
|
|
JWT token string |
|
|
""" |
|
|
if not JWT_AVAILABLE: |
|
|
raise RuntimeError("PyJWT not installed") |
|
|
|
|
|
if expires_delta is None: |
|
|
expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
|
|
|
|
|
expire = datetime.utcnow() + expires_delta |
|
|
payload = { |
|
|
'sub': username, |
|
|
'exp': expire, |
|
|
'iat': datetime.utcnow() |
|
|
} |
|
|
|
|
|
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) |
|
|
return token |
|
|
|
|
|
def verify_token(self, token: str) -> Optional[str]: |
|
|
""" |
|
|
Verify JWT token and extract username |
|
|
|
|
|
Args: |
|
|
token: JWT token string |
|
|
|
|
|
Returns: |
|
|
Username if valid, None otherwise |
|
|
""" |
|
|
if not JWT_AVAILABLE: |
|
|
return None |
|
|
|
|
|
try: |
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
|
|
username: str = payload.get('sub') |
|
|
return username |
|
|
except jwt.ExpiredSignatureError: |
|
|
logger.warning("Token expired") |
|
|
return None |
|
|
except jwt.JWTError as e: |
|
|
logger.warning(f"Invalid token: {e}") |
|
|
return None |
|
|
|
|
|
def verify_api_key(self, api_key: str) -> bool: |
|
|
""" |
|
|
Verify API key |
|
|
|
|
|
Args: |
|
|
api_key: API key string |
|
|
|
|
|
Returns: |
|
|
True if valid and active, False otherwise |
|
|
""" |
|
|
if api_key not in self.api_keys_db: |
|
|
return False |
|
|
|
|
|
key_data = self.api_keys_db[api_key] |
|
|
return key_data.get('active', False) |
|
|
|
|
|
def create_api_key(self, name: str) -> str: |
|
|
""" |
|
|
Create new API key |
|
|
|
|
|
Args: |
|
|
name: Descriptive name for the key |
|
|
|
|
|
Returns: |
|
|
Generated API key |
|
|
""" |
|
|
api_key = secrets.token_urlsafe(32) |
|
|
self.api_keys_db[api_key] = { |
|
|
'created_at': datetime.utcnow(), |
|
|
'name': name, |
|
|
'active': True, |
|
|
'usage_count': 0 |
|
|
} |
|
|
logger.info(f"Created API key: {name}") |
|
|
return api_key |
|
|
|
|
|
def revoke_api_key(self, api_key: str) -> bool: |
|
|
""" |
|
|
Revoke API key |
|
|
|
|
|
Args: |
|
|
api_key: API key to revoke |
|
|
|
|
|
Returns: |
|
|
True if revoked, False if not found |
|
|
""" |
|
|
if api_key in self.api_keys_db: |
|
|
self.api_keys_db[api_key]['active'] = False |
|
|
logger.info(f"Revoked API key: {self.api_keys_db[api_key]['name']}") |
|
|
return True |
|
|
return False |
|
|
|
|
|
def track_usage(self, api_key: str): |
|
|
"""Track API key usage""" |
|
|
if api_key in self.api_keys_db: |
|
|
self.api_keys_db[api_key]['usage_count'] = \ |
|
|
self.api_keys_db[api_key].get('usage_count', 0) + 1 |
|
|
|
|
|
|
|
|
|
|
|
auth_manager = AuthManager() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def require_auth(func): |
|
|
""" |
|
|
Decorator to require authentication for endpoints |
|
|
Checks for JWT token in Authorization header or API key in X-API-Key header |
|
|
""" |
|
|
@wraps(func) |
|
|
async def wrapper(*args, **kwargs): |
|
|
if not ENABLE_AUTH: |
|
|
|
|
|
return await func(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
raise NotImplementedError("Integrate with your web framework") |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
def require_api_key(func): |
|
|
"""Decorator to require API key authentication""" |
|
|
@wraps(func) |
|
|
async def wrapper(*args, **kwargs): |
|
|
if not ENABLE_AUTH: |
|
|
return await func(*args, **kwargs) |
|
|
|
|
|
|
|
|
raise NotImplementedError("Integrate with your web framework") |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def authenticate_user(username: str, password: str) -> Optional[str]: |
|
|
""" |
|
|
Authenticate user and return JWT token |
|
|
|
|
|
Args: |
|
|
username: Username |
|
|
password: Password |
|
|
|
|
|
Returns: |
|
|
JWT token if successful, None otherwise |
|
|
""" |
|
|
if not ENABLE_AUTH: |
|
|
logger.warning("Authentication disabled") |
|
|
return None |
|
|
|
|
|
if auth_manager.verify_password(username, password): |
|
|
return auth_manager.create_access_token(username) |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def verify_request_auth( |
|
|
authorization: Optional[str] = None, |
|
|
api_key: Optional[str] = None |
|
|
) -> bool: |
|
|
""" |
|
|
Verify request authentication |
|
|
|
|
|
Args: |
|
|
authorization: Authorization header (Bearer token) |
|
|
api_key: X-API-Key header |
|
|
|
|
|
Returns: |
|
|
True if authenticated, False otherwise |
|
|
""" |
|
|
if not ENABLE_AUTH: |
|
|
return True |
|
|
|
|
|
|
|
|
if api_key and auth_manager.verify_api_key(api_key): |
|
|
auth_manager.track_usage(api_key) |
|
|
return True |
|
|
|
|
|
|
|
|
if authorization and authorization.startswith('Bearer '): |
|
|
token = authorization.split(' ')[1] |
|
|
username = auth_manager.verify_token(token) |
|
|
if username: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|