|
|
|
|
|
""" |
|
|
Authentication module for News Dashboard |
|
|
Handles user authentication, session management, and security |
|
|
""" |
|
|
|
|
|
import hashlib |
|
|
import secrets |
|
|
import json |
|
|
import os |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, Optional, Tuple |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class AuthManager: |
|
|
"""Manages user authentication and sessions""" |
|
|
|
|
|
def __init__(self, users_file: str = "users.json", sessions_file: str = "sessions.json"): |
|
|
self.users_file = users_file |
|
|
self.sessions_file = sessions_file |
|
|
self.users = self._load_users() |
|
|
self.sessions = self._load_sessions() |
|
|
self.session_timeout = timedelta(hours=24) |
|
|
|
|
|
|
|
|
if not self.users: |
|
|
self._create_default_admin() |
|
|
|
|
|
def _load_users(self) -> Dict[str, Dict]: |
|
|
"""Load users from JSON file""" |
|
|
try: |
|
|
if os.path.exists(self.users_file): |
|
|
with open(self.users_file, 'r') as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading users: {e}") |
|
|
return {} |
|
|
|
|
|
def _save_users(self): |
|
|
"""Save users to JSON file""" |
|
|
try: |
|
|
with open(self.users_file, 'w') as f: |
|
|
json.dump(self.users, f, indent=2) |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving users: {e}") |
|
|
|
|
|
def _load_sessions(self) -> Dict[str, Dict]: |
|
|
"""Load sessions from JSON file""" |
|
|
try: |
|
|
if os.path.exists(self.sessions_file): |
|
|
with open(self.sessions_file, 'r') as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading sessions: {e}") |
|
|
return {} |
|
|
|
|
|
def _save_sessions(self): |
|
|
"""Save sessions to JSON file""" |
|
|
try: |
|
|
with open(self.sessions_file, 'w') as f: |
|
|
json.dump(self.sessions, f, indent=2) |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving sessions: {e}") |
|
|
|
|
|
def _create_default_admin(self): |
|
|
"""Create default admin user""" |
|
|
admin_password = "admin123" |
|
|
self.add_user("admin", admin_password, is_admin=True) |
|
|
logger.warning("Created default admin user with password 'admin123' - PLEASE CHANGE THIS!") |
|
|
|
|
|
def _hash_password(self, password: str) -> str: |
|
|
"""Hash password using SHA-256 with salt""" |
|
|
salt = secrets.token_hex(16) |
|
|
password_hash = hashlib.sha256((password + salt).encode()).hexdigest() |
|
|
return f"{salt}:{password_hash}" |
|
|
|
|
|
def _verify_password(self, password: str, stored_hash: str) -> bool: |
|
|
"""Verify password against stored hash""" |
|
|
try: |
|
|
salt, password_hash = stored_hash.split(':') |
|
|
return hashlib.sha256((password + salt).encode()).hexdigest() == password_hash |
|
|
except: |
|
|
return False |
|
|
|
|
|
def add_user(self, username: str, password: str, is_admin: bool = False) -> bool: |
|
|
"""Add a new user""" |
|
|
if username in self.users: |
|
|
return False |
|
|
|
|
|
self.users[username] = { |
|
|
'password_hash': self._hash_password(password), |
|
|
'is_admin': is_admin, |
|
|
'created_at': datetime.now().isoformat(), |
|
|
'last_login': None |
|
|
} |
|
|
self._save_users() |
|
|
logger.info(f"Added user: {username}") |
|
|
return True |
|
|
|
|
|
def authenticate_user(self, username: str, password: str) -> Tuple[bool, str]: |
|
|
"""Authenticate user and return (success, session_token)""" |
|
|
if username not in self.users: |
|
|
return False, "" |
|
|
|
|
|
user = self.users[username] |
|
|
if not self._verify_password(password, user['password_hash']): |
|
|
return False, "" |
|
|
|
|
|
|
|
|
user['last_login'] = datetime.now().isoformat() |
|
|
self._save_users() |
|
|
|
|
|
|
|
|
session_token = secrets.token_urlsafe(32) |
|
|
self.sessions[session_token] = { |
|
|
'username': username, |
|
|
'created_at': datetime.now().isoformat(), |
|
|
'last_activity': datetime.now().isoformat() |
|
|
} |
|
|
self._save_sessions() |
|
|
|
|
|
logger.info(f"User {username} authenticated successfully") |
|
|
return True, session_token |
|
|
|
|
|
def validate_session(self, session_token: str) -> Tuple[bool, Optional[str]]: |
|
|
"""Validate session token and return (valid, username)""" |
|
|
if not session_token or session_token not in self.sessions: |
|
|
return False, None |
|
|
|
|
|
session = self.sessions[session_token] |
|
|
last_activity = datetime.fromisoformat(session['last_activity']) |
|
|
|
|
|
|
|
|
if datetime.now() - last_activity > self.session_timeout: |
|
|
self.logout_user(session_token) |
|
|
return False, None |
|
|
|
|
|
|
|
|
session['last_activity'] = datetime.now().isoformat() |
|
|
self._save_sessions() |
|
|
|
|
|
return True, session['username'] |
|
|
|
|
|
def logout_user(self, session_token: str) -> bool: |
|
|
"""Logout user by removing session""" |
|
|
if session_token in self.sessions: |
|
|
del self.sessions[session_token] |
|
|
self._save_sessions() |
|
|
return True |
|
|
return False |
|
|
|
|
|
def is_admin(self, username: str) -> bool: |
|
|
"""Check if user is admin""" |
|
|
return username in self.users and self.users[username].get('is_admin', False) |
|
|
|
|
|
def change_password(self, username: str, old_password: str, new_password: str) -> bool: |
|
|
"""Change user password""" |
|
|
if username not in self.users: |
|
|
return False |
|
|
|
|
|
user = self.users[username] |
|
|
if not self._verify_password(old_password, user['password_hash']): |
|
|
return False |
|
|
|
|
|
user['password_hash'] = self._hash_password(new_password) |
|
|
self._save_users() |
|
|
logger.info(f"Password changed for user: {username}") |
|
|
return True |
|
|
|
|
|
def get_user_info(self, username: str) -> Optional[Dict]: |
|
|
"""Get user information (without password hash)""" |
|
|
if username not in self.users: |
|
|
return None |
|
|
|
|
|
user = self.users[username].copy() |
|
|
del user['password_hash'] |
|
|
return user |
|
|
|
|
|
def list_users(self) -> Dict[str, Dict]: |
|
|
"""List all users (admin only)""" |
|
|
result = {} |
|
|
for username, user in self.users.items(): |
|
|
result[username] = { |
|
|
'is_admin': user.get('is_admin', False), |
|
|
'created_at': user.get('created_at'), |
|
|
'last_login': user.get('last_login') |
|
|
} |
|
|
return result |
|
|
|
|
|
def delete_user(self, username: str) -> bool: |
|
|
"""Delete user (admin only)""" |
|
|
if username not in self.users: |
|
|
return False |
|
|
|
|
|
|
|
|
sessions_to_remove = [] |
|
|
for token, session in self.sessions.items(): |
|
|
if session['username'] == username: |
|
|
sessions_to_remove.append(token) |
|
|
|
|
|
for token in sessions_to_remove: |
|
|
del self.sessions[token] |
|
|
|
|
|
del self.users[username] |
|
|
self._save_users() |
|
|
self._save_sessions() |
|
|
logger.info(f"Deleted user: {username}") |
|
|
return True |
|
|
|
|
|
def cleanup_expired_sessions(self): |
|
|
"""Remove expired sessions""" |
|
|
current_time = datetime.now() |
|
|
expired_sessions = [] |
|
|
|
|
|
for token, session in self.sessions.items(): |
|
|
last_activity = datetime.fromisoformat(session['last_activity']) |
|
|
if current_time - last_activity > self.session_timeout: |
|
|
expired_sessions.append(token) |
|
|
|
|
|
for token in expired_sessions: |
|
|
del self.sessions[token] |
|
|
|
|
|
if expired_sessions: |
|
|
self._save_sessions() |
|
|
logger.info(f"Cleaned up {len(expired_sessions)} expired sessions") |
|
|
|
|
|
|
|
|
|
|
|
auth_manager = AuthManager() |
|
|
|