#!/usr/bin/env python3 """ هماهنگ‌کننده جمع‌آوری داده Data Collection Orchestrator - Manages all collectors """ import asyncio import sys import os from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime, timedelta import logging # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from crypto_data_bank.database import get_db from crypto_data_bank.collectors.free_price_collector import FreePriceCollector from crypto_data_bank.collectors.rss_news_collector import RSSNewsCollector from crypto_data_bank.collectors.sentiment_collector import SentimentCollector from crypto_data_bank.ai.huggingface_models import get_analyzer logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DataCollectionOrchestrator: """ هماهنگ‌کننده اصلی جمع‌آوری داده Main orchestrator for data collection from all FREE sources """ def __init__(self): self.db = get_db() self.price_collector = FreePriceCollector() self.news_collector = RSSNewsCollector() self.sentiment_collector = SentimentCollector() self.ai_analyzer = get_analyzer() self.collection_tasks = [] self.is_running = False # Collection intervals (in seconds) self.intervals = { 'prices': 60, # Every 1 minute 'news': 300, # Every 5 minutes 'sentiment': 180, # Every 3 minutes } self.last_collection = { 'prices': None, 'news': None, 'sentiment': None, } async def collect_and_store_prices(self): """جمع‌آوری و ذخیره قیمت‌ها""" try: logger.info("💰 Collecting prices from FREE sources...") # Collect from all free sources all_prices = await self.price_collector.collect_all_free_sources() # Aggregate prices aggregated = self.price_collector.aggregate_prices(all_prices) # Save to database saved_count = 0 for price_data in aggregated: try: self.db.save_price( symbol=price_data['symbol'], price_data=price_data, source='free_aggregated' ) saved_count += 1 except Exception as e: logger.error(f"Error saving price for {price_data.get('symbol')}: {e}") self.last_collection['prices'] = datetime.now() logger.info(f"✅ Saved {saved_count}/{len(aggregated)} prices to database") return { "success": True, "prices_collected": len(aggregated), "prices_saved": saved_count, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Error collecting prices: {e}") return { "success": False, "error": str(e), "timestamp": datetime.now().isoformat() } async def collect_and_store_news(self): """جمع‌آوری و ذخیره اخبار""" try: logger.info("📰 Collecting news from FREE RSS feeds...") # Collect from all RSS feeds all_news = await self.news_collector.collect_all_rss_feeds() # Deduplicate unique_news = self.news_collector.deduplicate_news(all_news) # Analyze with AI (if available) if hasattr(self.ai_analyzer, 'analyze_news_batch'): logger.info("🤖 Analyzing news with AI...") analyzed_news = await self.ai_analyzer.analyze_news_batch(unique_news[:50]) else: analyzed_news = unique_news # Save to database saved_count = 0 for news_item in analyzed_news: try: # Add AI sentiment if available if 'ai_sentiment' in news_item: news_item['sentiment'] = news_item['ai_confidence'] self.db.save_news(news_item) saved_count += 1 except Exception as e: logger.error(f"Error saving news: {e}") self.last_collection['news'] = datetime.now() logger.info(f"✅ Saved {saved_count}/{len(analyzed_news)} news items to database") # Store AI analysis if available if analyzed_news and 'ai_sentiment' in analyzed_news[0]: try: # Get trending coins from news trending = self.news_collector.get_trending_coins(analyzed_news) # Save AI analysis for trending coins for trend in trending[:10]: symbol = trend['coin'] symbol_news = [n for n in analyzed_news if symbol in n.get('coins', [])] if symbol_news: agg_sentiment = await self.ai_analyzer.calculate_aggregated_sentiment( symbol_news, symbol ) self.db.save_ai_analysis({ 'symbol': symbol, 'analysis_type': 'news_sentiment', 'model_used': 'finbert', 'input_data': { 'news_count': len(symbol_news), 'mentions': trend['mentions'] }, 'output_data': agg_sentiment, 'confidence': agg_sentiment.get('confidence', 0.0) }) logger.info(f"✅ Saved AI analysis for {len(trending[:10])} trending coins") except Exception as e: logger.error(f"Error saving AI analysis: {e}") return { "success": True, "news_collected": len(unique_news), "news_saved": saved_count, "ai_analyzed": 'ai_sentiment' in analyzed_news[0] if analyzed_news else False, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Error collecting news: {e}") return { "success": False, "error": str(e), "timestamp": datetime.now().isoformat() } async def collect_and_store_sentiment(self): """جمع‌آوری و ذخیره احساسات بازار""" try: logger.info("😊 Collecting market sentiment from FREE sources...") # Collect all sentiment data sentiment_data = await self.sentiment_collector.collect_all_sentiment_data() # Save overall sentiment if sentiment_data.get('overall_sentiment'): self.db.save_sentiment( sentiment_data['overall_sentiment'], source='free_aggregated' ) self.last_collection['sentiment'] = datetime.now() logger.info(f"✅ Saved market sentiment: {sentiment_data['overall_sentiment']['overall_sentiment']}") return { "success": True, "sentiment": sentiment_data['overall_sentiment'], "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Error collecting sentiment: {e}") return { "success": False, "error": str(e), "timestamp": datetime.now().isoformat() } async def collect_all_data_once(self) -> Dict[str, Any]: """ جمع‌آوری همه داده‌ها یک بار Collect all data once (prices, news, sentiment) """ logger.info("🚀 Starting full data collection cycle...") results = await asyncio.gather( self.collect_and_store_prices(), self.collect_and_store_news(), self.collect_and_store_sentiment(), return_exceptions=True ) return { "prices": results[0] if not isinstance(results[0], Exception) else {"error": str(results[0])}, "news": results[1] if not isinstance(results[1], Exception) else {"error": str(results[1])}, "sentiment": results[2] if not isinstance(results[2], Exception) else {"error": str(results[2])}, "timestamp": datetime.now().isoformat() } async def price_collection_loop(self): """حلقه جمع‌آوری مستمر قیمت‌ها""" while self.is_running: try: await self.collect_and_store_prices() await asyncio.sleep(self.intervals['prices']) except Exception as e: logger.error(f"Error in price collection loop: {e}") await asyncio.sleep(60) # Wait 1 minute on error async def news_collection_loop(self): """حلقه جمع‌آوری مستمر اخبار""" while self.is_running: try: await self.collect_and_store_news() await asyncio.sleep(self.intervals['news']) except Exception as e: logger.error(f"Error in news collection loop: {e}") await asyncio.sleep(300) # Wait 5 minutes on error async def sentiment_collection_loop(self): """حلقه جمع‌آوری مستمر احساسات""" while self.is_running: try: await self.collect_and_store_sentiment() await asyncio.sleep(self.intervals['sentiment']) except Exception as e: logger.error(f"Error in sentiment collection loop: {e}") await asyncio.sleep(180) # Wait 3 minutes on error async def start_background_collection(self): """ شروع جمع‌آوری پس‌زمینه Start continuous background data collection """ logger.info("🚀 Starting background data collection...") self.is_running = True # Start all collection loops self.collection_tasks = [ asyncio.create_task(self.price_collection_loop()), asyncio.create_task(self.news_collection_loop()), asyncio.create_task(self.sentiment_collection_loop()), ] logger.info("✅ Background collection started!") logger.info(f" Prices: every {self.intervals['prices']}s") logger.info(f" News: every {self.intervals['news']}s") logger.info(f" Sentiment: every {self.intervals['sentiment']}s") async def stop_background_collection(self): """توقف جمع‌آوری پس‌زمینه""" logger.info("🛑 Stopping background data collection...") self.is_running = False # Cancel all tasks for task in self.collection_tasks: task.cancel() # Wait for tasks to complete await asyncio.gather(*self.collection_tasks, return_exceptions=True) logger.info("✅ Background collection stopped!") def get_collection_status(self) -> Dict[str, Any]: """دریافت وضعیت جمع‌آوری""" return { "is_running": self.is_running, "last_collection": { k: v.isoformat() if v else None for k, v in self.last_collection.items() }, "intervals": self.intervals, "database_stats": self.db.get_statistics(), "timestamp": datetime.now().isoformat() } # Singleton instance _orchestrator = None def get_orchestrator() -> DataCollectionOrchestrator: """دریافت instance هماهنگ‌کننده""" global _orchestrator if _orchestrator is None: _orchestrator = DataCollectionOrchestrator() return _orchestrator async def main(): """Test the orchestrator""" print("\n" + "="*70) print("🧪 Testing Data Collection Orchestrator") print("="*70) orchestrator = get_orchestrator() # Test single collection cycle print("\n1️⃣ Testing Single Collection Cycle...") results = await orchestrator.collect_all_data_once() print("\n📊 Results:") print(f" Prices: {results['prices'].get('prices_saved', 0)} saved") print(f" News: {results['news'].get('news_saved', 0)} saved") print(f" Sentiment: {results['sentiment'].get('success', False)}") # Show database stats print("\n2️⃣ Database Statistics:") stats = orchestrator.get_collection_status() print(f" Database size: {stats['database_stats'].get('database_size', 0):,} bytes") print(f" Prices: {stats['database_stats'].get('prices_count', 0)}") print(f" News: {stats['database_stats'].get('news_count', 0)}") print(f" AI Analysis: {stats['database_stats'].get('ai_analysis_count', 0)}") print("\n✅ Orchestrator test complete!") if __name__ == "__main__": asyncio.run(main())