|
|
""" |
|
|
Persistence Service |
|
|
Handles data persistence with multiple export formats (JSON, CSV, database) |
|
|
""" |
|
|
import json |
|
|
import csv |
|
|
import logging |
|
|
from typing import Dict, Any, List, Optional |
|
|
from datetime import datetime, timedelta |
|
|
from pathlib import Path |
|
|
import asyncio |
|
|
from collections import defaultdict |
|
|
import pandas as pd |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class PersistenceService: |
|
|
"""Service for persisting data in multiple formats""" |
|
|
|
|
|
def __init__(self, db_manager=None, data_dir: str = 'data'): |
|
|
self.db_manager = db_manager |
|
|
self.data_dir = Path(data_dir) |
|
|
self.data_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
self.cache: Dict[str, Any] = {} |
|
|
self.history: Dict[str, List[Dict[str, Any]]] = defaultdict(list) |
|
|
self.max_history_per_api = 1000 |
|
|
|
|
|
async def save_api_data( |
|
|
self, |
|
|
api_id: str, |
|
|
data: Dict[str, Any], |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
) -> bool: |
|
|
""" |
|
|
Save API data with metadata |
|
|
|
|
|
Args: |
|
|
api_id: API identifier |
|
|
data: Data to save |
|
|
metadata: Additional metadata (category, source, etc.) |
|
|
|
|
|
Returns: |
|
|
Success status |
|
|
""" |
|
|
try: |
|
|
timestamp = datetime.now() |
|
|
|
|
|
|
|
|
record = { |
|
|
'api_id': api_id, |
|
|
'timestamp': timestamp.isoformat(), |
|
|
'data': data, |
|
|
'metadata': metadata or {} |
|
|
} |
|
|
|
|
|
|
|
|
self.cache[api_id] = record |
|
|
|
|
|
|
|
|
self.history[api_id].append(record) |
|
|
|
|
|
|
|
|
if len(self.history[api_id]) > self.max_history_per_api: |
|
|
self.history[api_id] = self.history[api_id][-self.max_history_per_api:] |
|
|
|
|
|
|
|
|
if self.db_manager: |
|
|
await self._save_to_database(api_id, data, metadata, timestamp) |
|
|
|
|
|
logger.debug(f"Saved data for {api_id}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving data for {api_id}: {e}") |
|
|
return False |
|
|
|
|
|
async def _save_to_database( |
|
|
self, |
|
|
api_id: str, |
|
|
data: Dict[str, Any], |
|
|
metadata: Dict[str, Any], |
|
|
timestamp: datetime |
|
|
): |
|
|
"""Save data to database""" |
|
|
if not self.db_manager: |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
category = metadata.get('category', 'unknown') |
|
|
|
|
|
with self.db_manager.get_session() as session: |
|
|
|
|
|
from database.models import Provider, DataCollection |
|
|
|
|
|
provider = session.query(Provider).filter_by(name=api_id).first() |
|
|
|
|
|
if not provider: |
|
|
|
|
|
provider = Provider( |
|
|
name=api_id, |
|
|
category=category, |
|
|
endpoint_url=metadata.get('url', ''), |
|
|
requires_key=metadata.get('requires_key', False), |
|
|
priority_tier=metadata.get('priority', 3) |
|
|
) |
|
|
session.add(provider) |
|
|
session.flush() |
|
|
|
|
|
|
|
|
collection = DataCollection( |
|
|
provider_id=provider.id, |
|
|
category=category, |
|
|
scheduled_time=timestamp, |
|
|
actual_fetch_time=timestamp, |
|
|
data_timestamp=timestamp, |
|
|
staleness_minutes=0, |
|
|
record_count=len(data) if isinstance(data, (list, dict)) else 1, |
|
|
payload_size_bytes=len(json.dumps(data)), |
|
|
on_schedule=True |
|
|
) |
|
|
session.add(collection) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving to database: {e}") |
|
|
|
|
|
def get_cached_data(self, api_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get cached data for an API""" |
|
|
return self.cache.get(api_id) |
|
|
|
|
|
def get_all_cached_data(self) -> Dict[str, Any]: |
|
|
"""Get all cached data""" |
|
|
return self.cache.copy() |
|
|
|
|
|
def get_history(self, api_id: str, limit: int = 100) -> List[Dict[str, Any]]: |
|
|
"""Get historical data for an API""" |
|
|
history = self.history.get(api_id, []) |
|
|
return history[-limit:] if limit else history |
|
|
|
|
|
def get_all_history(self) -> Dict[str, List[Dict[str, Any]]]: |
|
|
"""Get all historical data""" |
|
|
return dict(self.history) |
|
|
|
|
|
async def export_to_json( |
|
|
self, |
|
|
filepath: str, |
|
|
api_ids: Optional[List[str]] = None, |
|
|
include_history: bool = False |
|
|
) -> bool: |
|
|
""" |
|
|
Export data to JSON file |
|
|
|
|
|
Args: |
|
|
filepath: Output file path |
|
|
api_ids: Specific APIs to export (None = all) |
|
|
include_history: Include historical data |
|
|
|
|
|
Returns: |
|
|
Success status |
|
|
""" |
|
|
try: |
|
|
filepath = Path(filepath) |
|
|
filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
if include_history: |
|
|
data = { |
|
|
'cache': self.cache, |
|
|
'history': dict(self.history), |
|
|
'exported_at': datetime.now().isoformat() |
|
|
} |
|
|
else: |
|
|
data = { |
|
|
'cache': self.cache, |
|
|
'exported_at': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
if api_ids: |
|
|
if 'cache' in data: |
|
|
data['cache'] = {k: v for k, v in data['cache'].items() if k in api_ids} |
|
|
if 'history' in data: |
|
|
data['history'] = {k: v for k, v in data['history'].items() if k in api_ids} |
|
|
|
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, indent=2, default=str) |
|
|
|
|
|
logger.info(f"Exported data to JSON: {filepath}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error exporting to JSON: {e}") |
|
|
return False |
|
|
|
|
|
async def export_to_csv( |
|
|
self, |
|
|
filepath: str, |
|
|
api_ids: Optional[List[str]] = None, |
|
|
flatten: bool = True |
|
|
) -> bool: |
|
|
""" |
|
|
Export data to CSV file |
|
|
|
|
|
Args: |
|
|
filepath: Output file path |
|
|
api_ids: Specific APIs to export (None = all) |
|
|
flatten: Flatten nested data structures |
|
|
|
|
|
Returns: |
|
|
Success status |
|
|
""" |
|
|
try: |
|
|
filepath = Path(filepath) |
|
|
filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
rows = [] |
|
|
|
|
|
cache_items = self.cache.items() |
|
|
if api_ids: |
|
|
cache_items = [(k, v) for k, v in cache_items if k in api_ids] |
|
|
|
|
|
for api_id, record in cache_items: |
|
|
row = { |
|
|
'api_id': api_id, |
|
|
'timestamp': record.get('timestamp'), |
|
|
'category': record.get('metadata', {}).get('category', ''), |
|
|
} |
|
|
|
|
|
|
|
|
if flatten: |
|
|
data = record.get('data', {}) |
|
|
if isinstance(data, dict): |
|
|
for key, value in data.items(): |
|
|
|
|
|
if isinstance(value, (str, int, float, bool)): |
|
|
row[f'data_{key}'] = value |
|
|
else: |
|
|
row[f'data_{key}'] = json.dumps(value) |
|
|
else: |
|
|
row['data'] = json.dumps(record.get('data')) |
|
|
|
|
|
rows.append(row) |
|
|
|
|
|
|
|
|
if rows: |
|
|
df = pd.DataFrame(rows) |
|
|
df.to_csv(filepath, index=False) |
|
|
logger.info(f"Exported data to CSV: {filepath}") |
|
|
return True |
|
|
else: |
|
|
logger.warning("No data to export to CSV") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error exporting to CSV: {e}") |
|
|
return False |
|
|
|
|
|
async def export_history_to_csv( |
|
|
self, |
|
|
filepath: str, |
|
|
api_id: str |
|
|
) -> bool: |
|
|
""" |
|
|
Export historical data for a specific API to CSV |
|
|
|
|
|
Args: |
|
|
filepath: Output file path |
|
|
api_id: API identifier |
|
|
|
|
|
Returns: |
|
|
Success status |
|
|
""" |
|
|
try: |
|
|
filepath = Path(filepath) |
|
|
filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
history = self.history.get(api_id, []) |
|
|
|
|
|
if not history: |
|
|
logger.warning(f"No history data for {api_id}") |
|
|
return False |
|
|
|
|
|
|
|
|
rows = [] |
|
|
for record in history: |
|
|
row = { |
|
|
'timestamp': record.get('timestamp'), |
|
|
'api_id': record.get('api_id'), |
|
|
'data': json.dumps(record.get('data')) |
|
|
} |
|
|
rows.append(row) |
|
|
|
|
|
|
|
|
df = pd.DataFrame(rows) |
|
|
df.to_csv(filepath, index=False) |
|
|
|
|
|
logger.info(f"Exported history for {api_id} to CSV: {filepath}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error exporting history to CSV: {e}") |
|
|
return False |
|
|
|
|
|
async def import_from_json(self, filepath: str) -> bool: |
|
|
""" |
|
|
Import data from JSON file |
|
|
|
|
|
Args: |
|
|
filepath: Input file path |
|
|
|
|
|
Returns: |
|
|
Success status |
|
|
""" |
|
|
try: |
|
|
filepath = Path(filepath) |
|
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
if 'cache' in data: |
|
|
self.cache.update(data['cache']) |
|
|
|
|
|
|
|
|
if 'history' in data: |
|
|
for api_id, records in data['history'].items(): |
|
|
self.history[api_id].extend(records) |
|
|
|
|
|
|
|
|
if len(self.history[api_id]) > self.max_history_per_api: |
|
|
self.history[api_id] = self.history[api_id][-self.max_history_per_api:] |
|
|
|
|
|
logger.info(f"Imported data from JSON: {filepath}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error importing from JSON: {e}") |
|
|
return False |
|
|
|
|
|
async def backup_all_data(self, backup_dir: Optional[str] = None) -> str: |
|
|
""" |
|
|
Create a backup of all data |
|
|
|
|
|
Args: |
|
|
backup_dir: Backup directory (uses default if None) |
|
|
|
|
|
Returns: |
|
|
Path to backup file |
|
|
""" |
|
|
try: |
|
|
if backup_dir: |
|
|
backup_path = Path(backup_dir) |
|
|
else: |
|
|
backup_path = self.data_dir / 'backups' |
|
|
|
|
|
backup_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
|
|
backup_file = backup_path / f'backup_{timestamp}.json' |
|
|
|
|
|
|
|
|
await self.export_to_json( |
|
|
str(backup_file), |
|
|
include_history=True |
|
|
) |
|
|
|
|
|
logger.info(f"Created backup: {backup_file}") |
|
|
return str(backup_file) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating backup: {e}") |
|
|
raise |
|
|
|
|
|
async def restore_from_backup(self, backup_file: str) -> bool: |
|
|
""" |
|
|
Restore data from a backup file |
|
|
|
|
|
Args: |
|
|
backup_file: Path to backup file |
|
|
|
|
|
Returns: |
|
|
Success status |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Restoring from backup: {backup_file}") |
|
|
success = await self.import_from_json(backup_file) |
|
|
|
|
|
if success: |
|
|
logger.info("Backup restored successfully") |
|
|
|
|
|
return success |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error restoring from backup: {e}") |
|
|
return False |
|
|
|
|
|
def clear_cache(self): |
|
|
"""Clear all cached data""" |
|
|
self.cache.clear() |
|
|
logger.info("Cache cleared") |
|
|
|
|
|
def clear_history(self, api_id: Optional[str] = None): |
|
|
"""Clear history for specific API or all""" |
|
|
if api_id: |
|
|
if api_id in self.history: |
|
|
del self.history[api_id] |
|
|
logger.info(f"Cleared history for {api_id}") |
|
|
else: |
|
|
self.history.clear() |
|
|
logger.info("Cleared all history") |
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]: |
|
|
"""Get statistics about stored data""" |
|
|
total_cached = len(self.cache) |
|
|
total_history_records = sum(len(records) for records in self.history.values()) |
|
|
|
|
|
api_stats = {} |
|
|
for api_id, records in self.history.items(): |
|
|
if records: |
|
|
timestamps = [ |
|
|
datetime.fromisoformat(r['timestamp']) |
|
|
for r in records |
|
|
if 'timestamp' in r |
|
|
] |
|
|
|
|
|
if timestamps: |
|
|
api_stats[api_id] = { |
|
|
'record_count': len(records), |
|
|
'oldest': min(timestamps).isoformat(), |
|
|
'newest': max(timestamps).isoformat() |
|
|
} |
|
|
|
|
|
return { |
|
|
'cached_apis': total_cached, |
|
|
'total_history_records': total_history_records, |
|
|
'apis_with_history': len(self.history), |
|
|
'api_statistics': api_stats |
|
|
} |
|
|
|
|
|
async def cleanup_old_data(self, days: int = 7) -> int: |
|
|
""" |
|
|
Remove data older than specified days |
|
|
|
|
|
Args: |
|
|
days: Number of days to keep |
|
|
|
|
|
Returns: |
|
|
Number of records removed |
|
|
""" |
|
|
try: |
|
|
cutoff = datetime.now() - timedelta(days=days) |
|
|
removed_count = 0 |
|
|
|
|
|
for api_id, records in list(self.history.items()): |
|
|
original_count = len(records) |
|
|
|
|
|
|
|
|
self.history[api_id] = [ |
|
|
r for r in records |
|
|
if datetime.fromisoformat(r['timestamp']) > cutoff |
|
|
] |
|
|
|
|
|
removed_count += original_count - len(self.history[api_id]) |
|
|
|
|
|
|
|
|
if not self.history[api_id]: |
|
|
del self.history[api_id] |
|
|
|
|
|
logger.info(f"Cleaned up {removed_count} old records (older than {days} days)") |
|
|
return removed_count |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error during cleanup: {e}") |
|
|
return 0 |
|
|
|
|
|
async def save_collection_data( |
|
|
self, |
|
|
api_id: str, |
|
|
category: str, |
|
|
data: Dict[str, Any], |
|
|
timestamp: datetime |
|
|
): |
|
|
""" |
|
|
Save data collection (compatibility method for scheduler) |
|
|
|
|
|
Args: |
|
|
api_id: API identifier |
|
|
category: Data category |
|
|
data: Collected data |
|
|
timestamp: Collection timestamp |
|
|
""" |
|
|
metadata = { |
|
|
'category': category, |
|
|
'collection_time': timestamp.isoformat() |
|
|
} |
|
|
|
|
|
await self.save_api_data(api_id, data, metadata) |
|
|
|