""" Data Persistence Module Saves collected data from all collectors into the database """ from datetime import datetime from typing import Dict, List, Any, Optional from database.db_manager import db_manager from utils.logger import setup_logger logger = setup_logger("data_persistence") class DataPersistence: """ Handles saving collected data to the database """ def __init__(self): """Initialize data persistence""" self.stats = { 'market_prices_saved': 0, 'news_saved': 0, 'sentiment_saved': 0, 'whale_txs_saved': 0, 'gas_prices_saved': 0, 'blockchain_stats_saved': 0 } def reset_stats(self): """Reset persistence statistics""" for key in self.stats: self.stats[key] = 0 def get_stats(self) -> Dict[str, int]: """Get persistence statistics""" return self.stats.copy() def save_market_data(self, results: List[Dict[str, Any]]) -> int: """ Save market data to database Args: results: List of market data results from collectors Returns: Number of prices saved """ saved_count = 0 for result in results: if not result.get('success', False): continue provider = result.get('provider', 'Unknown') data = result.get('data') if not data: continue try: # CoinGecko format if provider == "CoinGecko" and isinstance(data, dict): # Map CoinGecko coin IDs to symbols symbol_map = { 'bitcoin': 'BTC', 'ethereum': 'ETH', 'binancecoin': 'BNB' } for coin_id, coin_data in data.items(): if isinstance(coin_data, dict) and 'usd' in coin_data: symbol = symbol_map.get(coin_id, coin_id.upper()) db_manager.save_market_price( symbol=symbol, price_usd=coin_data.get('usd', 0), market_cap=coin_data.get('usd_market_cap'), volume_24h=coin_data.get('usd_24h_vol'), price_change_24h=coin_data.get('usd_24h_change'), source=provider ) saved_count += 1 # Binance format elif provider == "Binance" and isinstance(data, dict): # Binance returns symbol -> price mapping for symbol, price in data.items(): if isinstance(price, (int, float)): # Remove "USDT" suffix if present clean_symbol = symbol.replace('USDT', '') db_manager.save_market_price( symbol=clean_symbol, price_usd=float(price), source=provider ) saved_count += 1 # CoinMarketCap format elif provider == "CoinMarketCap" and isinstance(data, dict): if 'data' in data: for coin_id, coin_data in data['data'].items(): if isinstance(coin_data, dict): symbol = coin_data.get('symbol', '').upper() quote_usd = coin_data.get('quote', {}).get('USD', {}) if symbol and quote_usd: db_manager.save_market_price( symbol=symbol, price_usd=quote_usd.get('price', 0), market_cap=quote_usd.get('market_cap'), volume_24h=quote_usd.get('volume_24h'), price_change_24h=quote_usd.get('percent_change_24h'), source=provider ) saved_count += 1 except Exception as e: logger.error(f"Error saving market data from {provider}: {e}", exc_info=True) self.stats['market_prices_saved'] += saved_count if saved_count > 0: logger.info(f"Saved {saved_count} market prices to database") return saved_count def save_news_data(self, results: List[Dict[str, Any]]) -> int: """ Save news data to database Args: results: List of news results from collectors Returns: Number of articles saved """ saved_count = 0 for result in results: if not result.get('success', False): continue provider = result.get('provider', 'Unknown') data = result.get('data') if not data: continue try: # CryptoPanic format if provider == "CryptoPanic" and isinstance(data, dict): results_list = data.get('results', []) for article in results_list: if not isinstance(article, dict): continue # Parse published_at published_at = None if 'created_at' in article: try: pub_str = article['created_at'] if pub_str.endswith('Z'): pub_str = pub_str.replace('Z', '+00:00') published_at = datetime.fromisoformat(pub_str) except: published_at = datetime.utcnow() if not published_at: published_at = datetime.utcnow() # Extract currencies as tags currencies = article.get('currencies', []) tags = ','.join([c.get('code', '') for c in currencies if isinstance(c, dict)]) db_manager.save_news_article( title=article.get('title', ''), content=article.get('body', ''), source=provider, url=article.get('url', ''), published_at=published_at, sentiment=article.get('sentiment'), tags=tags ) saved_count += 1 # NewsAPI format (newsdata.io) elif provider == "NewsAPI" and isinstance(data, dict): results_list = data.get('results', []) for article in results_list: if not isinstance(article, dict): continue # Parse published_at published_at = None if 'pubDate' in article: try: pub_str = article['pubDate'] if pub_str.endswith('Z'): pub_str = pub_str.replace('Z', '+00:00') published_at = datetime.fromisoformat(pub_str) except: published_at = datetime.utcnow() if not published_at: published_at = datetime.utcnow() # Extract keywords as tags keywords = article.get('keywords', []) tags = ','.join(keywords) if isinstance(keywords, list) else '' db_manager.save_news_article( title=article.get('title', ''), content=article.get('description', ''), source=provider, url=article.get('link', ''), published_at=published_at, tags=tags ) saved_count += 1 except Exception as e: logger.error(f"Error saving news data from {provider}: {e}", exc_info=True) self.stats['news_saved'] += saved_count if saved_count > 0: logger.info(f"Saved {saved_count} news articles to database") return saved_count def save_sentiment_data(self, results: List[Dict[str, Any]]) -> int: """ Save sentiment data to database Args: results: List of sentiment results from collectors Returns: Number of sentiment metrics saved """ saved_count = 0 for result in results: if not result.get('success', False): continue provider = result.get('provider', 'Unknown') data = result.get('data') if not data: continue try: # Fear & Greed Index format if provider == "AlternativeMe" and isinstance(data, dict): data_list = data.get('data', []) if data_list and isinstance(data_list, list): index_data = data_list[0] if isinstance(index_data, dict): value = float(index_data.get('value', 50)) value_classification = index_data.get('value_classification', 'neutral') # Map classification to standard format classification_map = { 'Extreme Fear': 'extreme_fear', 'Fear': 'fear', 'Neutral': 'neutral', 'Greed': 'greed', 'Extreme Greed': 'extreme_greed' } classification = classification_map.get( value_classification, value_classification.lower().replace(' ', '_') ) # Parse timestamp timestamp = None if 'timestamp' in index_data: try: timestamp = datetime.fromtimestamp(int(index_data['timestamp'])) except: pass db_manager.save_sentiment_metric( metric_name='fear_greed_index', value=value, classification=classification, source=provider, timestamp=timestamp ) saved_count += 1 except Exception as e: logger.error(f"Error saving sentiment data from {provider}: {e}", exc_info=True) self.stats['sentiment_saved'] += saved_count if saved_count > 0: logger.info(f"Saved {saved_count} sentiment metrics to database") return saved_count def save_whale_data(self, results: List[Dict[str, Any]]) -> int: """ Save whale transaction data to database Args: results: List of whale tracking results from collectors Returns: Number of whale transactions saved """ saved_count = 0 for result in results: if not result.get('success', False): continue provider = result.get('provider', 'Unknown') data = result.get('data') if not data: continue try: # WhaleAlert format if provider == "WhaleAlert" and isinstance(data, dict): transactions = data.get('transactions', []) for tx in transactions: if not isinstance(tx, dict): continue # Parse timestamp timestamp = None if 'timestamp' in tx: try: timestamp = datetime.fromtimestamp(tx['timestamp']) except: timestamp = datetime.utcnow() if not timestamp: timestamp = datetime.utcnow() # Extract addresses from_address = tx.get('from', {}).get('address', '') if isinstance(tx.get('from'), dict) else '' to_address = tx.get('to', {}).get('address', '') if isinstance(tx.get('to'), dict) else '' db_manager.save_whale_transaction( blockchain=tx.get('blockchain', 'unknown'), transaction_hash=tx.get('hash', ''), from_address=from_address, to_address=to_address, amount=float(tx.get('amount', 0)), amount_usd=float(tx.get('amount_usd', 0)), source=provider, timestamp=timestamp ) saved_count += 1 except Exception as e: logger.error(f"Error saving whale data from {provider}: {e}", exc_info=True) self.stats['whale_txs_saved'] += saved_count if saved_count > 0: logger.info(f"Saved {saved_count} whale transactions to database") return saved_count def save_blockchain_data(self, results: List[Dict[str, Any]]) -> int: """ Save blockchain data (gas prices, stats) to database Args: results: List of blockchain results from collectors Returns: Number of records saved """ saved_count = 0 for result in results: if not result.get('success', False): continue provider = result.get('provider', 'Unknown') data = result.get('data') if not data: continue try: # Etherscan gas price format if provider == "Etherscan" and isinstance(data, dict): if 'result' in data: gas_data = data['result'] if isinstance(gas_data, dict): db_manager.save_gas_price( blockchain='ethereum', gas_price_gwei=float(gas_data.get('ProposeGasPrice', 0)), fast_gas_price=float(gas_data.get('FastGasPrice', 0)), standard_gas_price=float(gas_data.get('ProposeGasPrice', 0)), slow_gas_price=float(gas_data.get('SafeGasPrice', 0)), source=provider ) saved_count += 1 self.stats['gas_prices_saved'] += 1 # Other blockchain explorers elif provider in ["BSCScan", "PolygonScan"]: blockchain_map = { "BSCScan": "bsc", "PolygonScan": "polygon" } blockchain = blockchain_map.get(provider, provider.lower()) if 'result' in data and isinstance(data['result'], dict): gas_data = data['result'] db_manager.save_gas_price( blockchain=blockchain, gas_price_gwei=float(gas_data.get('ProposeGasPrice', 0)), fast_gas_price=float(gas_data.get('FastGasPrice', 0)), standard_gas_price=float(gas_data.get('ProposeGasPrice', 0)), slow_gas_price=float(gas_data.get('SafeGasPrice', 0)), source=provider ) saved_count += 1 self.stats['gas_prices_saved'] += 1 except Exception as e: logger.error(f"Error saving blockchain data from {provider}: {e}", exc_info=True) if saved_count > 0: logger.info(f"Saved {saved_count} blockchain records to database") return saved_count def save_all_data(self, results: Dict[str, Any]) -> Dict[str, int]: """ Save all collected data to database Args: results: Results dictionary from master collector Returns: Dictionary with save statistics """ logger.info("=" * 60) logger.info("Saving collected data to database...") logger.info("=" * 60) self.reset_stats() data = results.get('data', {}) # Save market data if 'market_data' in data: self.save_market_data(data['market_data']) # Save news data if 'news' in data: self.save_news_data(data['news']) # Save sentiment data if 'sentiment' in data: self.save_sentiment_data(data['sentiment']) # Save whale tracking data if 'whale_tracking' in data: self.save_whale_data(data['whale_tracking']) # Save blockchain data if 'blockchain' in data: self.save_blockchain_data(data['blockchain']) stats = self.get_stats() total_saved = sum(stats.values()) logger.info("=" * 60) logger.info("Data Persistence Complete") logger.info(f"Total records saved: {total_saved}") logger.info(f" Market prices: {stats['market_prices_saved']}") logger.info(f" News articles: {stats['news_saved']}") logger.info(f" Sentiment metrics: {stats['sentiment_saved']}") logger.info(f" Whale transactions: {stats['whale_txs_saved']}") logger.info(f" Gas prices: {stats['gas_prices_saved']}") logger.info(f" Blockchain stats: {stats['blockchain_stats_saved']}") logger.info("=" * 60) return stats # Global instance data_persistence = DataPersistence()