|
|
|
|
|
""" |
|
|
هماهنگکننده جمعآوری داده |
|
|
Data Collection Orchestrator - Manages all collectors |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import sys |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional |
|
|
from datetime import datetime, timedelta |
|
|
import logging |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
from crypto_data_bank.database import get_db |
|
|
from crypto_data_bank.collectors.free_price_collector import FreePriceCollector |
|
|
from crypto_data_bank.collectors.rss_news_collector import RSSNewsCollector |
|
|
from crypto_data_bank.collectors.sentiment_collector import SentimentCollector |
|
|
from crypto_data_bank.ai.huggingface_models import get_analyzer |
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DataCollectionOrchestrator: |
|
|
""" |
|
|
هماهنگکننده اصلی جمعآوری داده |
|
|
Main orchestrator for data collection from all FREE sources |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.db = get_db() |
|
|
self.price_collector = FreePriceCollector() |
|
|
self.news_collector = RSSNewsCollector() |
|
|
self.sentiment_collector = SentimentCollector() |
|
|
self.ai_analyzer = get_analyzer() |
|
|
|
|
|
self.collection_tasks = [] |
|
|
self.is_running = False |
|
|
|
|
|
|
|
|
self.intervals = { |
|
|
'prices': 60, |
|
|
'news': 300, |
|
|
'sentiment': 180, |
|
|
} |
|
|
|
|
|
self.last_collection = { |
|
|
'prices': None, |
|
|
'news': None, |
|
|
'sentiment': None, |
|
|
} |
|
|
|
|
|
async def collect_and_store_prices(self): |
|
|
"""جمعآوری و ذخیره قیمتها""" |
|
|
try: |
|
|
logger.info("💰 Collecting prices from FREE sources...") |
|
|
|
|
|
|
|
|
all_prices = await self.price_collector.collect_all_free_sources() |
|
|
|
|
|
|
|
|
aggregated = self.price_collector.aggregate_prices(all_prices) |
|
|
|
|
|
|
|
|
saved_count = 0 |
|
|
for price_data in aggregated: |
|
|
try: |
|
|
self.db.save_price( |
|
|
symbol=price_data['symbol'], |
|
|
price_data=price_data, |
|
|
source='free_aggregated' |
|
|
) |
|
|
saved_count += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving price for {price_data.get('symbol')}: {e}") |
|
|
|
|
|
self.last_collection['prices'] = datetime.now() |
|
|
|
|
|
logger.info(f"✅ Saved {saved_count}/{len(aggregated)} prices to database") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"prices_collected": len(aggregated), |
|
|
"prices_saved": saved_count, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error collecting prices: {e}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
async def collect_and_store_news(self): |
|
|
"""جمعآوری و ذخیره اخبار""" |
|
|
try: |
|
|
logger.info("📰 Collecting news from FREE RSS feeds...") |
|
|
|
|
|
|
|
|
all_news = await self.news_collector.collect_all_rss_feeds() |
|
|
|
|
|
|
|
|
unique_news = self.news_collector.deduplicate_news(all_news) |
|
|
|
|
|
|
|
|
if hasattr(self.ai_analyzer, 'analyze_news_batch'): |
|
|
logger.info("🤖 Analyzing news with AI...") |
|
|
analyzed_news = await self.ai_analyzer.analyze_news_batch(unique_news[:50]) |
|
|
else: |
|
|
analyzed_news = unique_news |
|
|
|
|
|
|
|
|
saved_count = 0 |
|
|
for news_item in analyzed_news: |
|
|
try: |
|
|
|
|
|
if 'ai_sentiment' in news_item: |
|
|
news_item['sentiment'] = news_item['ai_confidence'] |
|
|
|
|
|
self.db.save_news(news_item) |
|
|
saved_count += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving news: {e}") |
|
|
|
|
|
self.last_collection['news'] = datetime.now() |
|
|
|
|
|
logger.info(f"✅ Saved {saved_count}/{len(analyzed_news)} news items to database") |
|
|
|
|
|
|
|
|
if analyzed_news and 'ai_sentiment' in analyzed_news[0]: |
|
|
try: |
|
|
|
|
|
trending = self.news_collector.get_trending_coins(analyzed_news) |
|
|
|
|
|
|
|
|
for trend in trending[:10]: |
|
|
symbol = trend['coin'] |
|
|
symbol_news = [n for n in analyzed_news if symbol in n.get('coins', [])] |
|
|
|
|
|
if symbol_news: |
|
|
agg_sentiment = await self.ai_analyzer.calculate_aggregated_sentiment( |
|
|
symbol_news, |
|
|
symbol |
|
|
) |
|
|
|
|
|
self.db.save_ai_analysis({ |
|
|
'symbol': symbol, |
|
|
'analysis_type': 'news_sentiment', |
|
|
'model_used': 'finbert', |
|
|
'input_data': { |
|
|
'news_count': len(symbol_news), |
|
|
'mentions': trend['mentions'] |
|
|
}, |
|
|
'output_data': agg_sentiment, |
|
|
'confidence': agg_sentiment.get('confidence', 0.0) |
|
|
}) |
|
|
|
|
|
logger.info(f"✅ Saved AI analysis for {len(trending[:10])} trending coins") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving AI analysis: {e}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"news_collected": len(unique_news), |
|
|
"news_saved": saved_count, |
|
|
"ai_analyzed": 'ai_sentiment' in analyzed_news[0] if analyzed_news else False, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error collecting news: {e}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
async def collect_and_store_sentiment(self): |
|
|
"""جمعآوری و ذخیره احساسات بازار""" |
|
|
try: |
|
|
logger.info("😊 Collecting market sentiment from FREE sources...") |
|
|
|
|
|
|
|
|
sentiment_data = await self.sentiment_collector.collect_all_sentiment_data() |
|
|
|
|
|
|
|
|
if sentiment_data.get('overall_sentiment'): |
|
|
self.db.save_sentiment( |
|
|
sentiment_data['overall_sentiment'], |
|
|
source='free_aggregated' |
|
|
) |
|
|
|
|
|
self.last_collection['sentiment'] = datetime.now() |
|
|
|
|
|
logger.info(f"✅ Saved market sentiment: {sentiment_data['overall_sentiment']['overall_sentiment']}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"sentiment": sentiment_data['overall_sentiment'], |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Error collecting sentiment: {e}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
async def collect_all_data_once(self) -> Dict[str, Any]: |
|
|
""" |
|
|
جمعآوری همه دادهها یک بار |
|
|
Collect all data once (prices, news, sentiment) |
|
|
""" |
|
|
logger.info("🚀 Starting full data collection cycle...") |
|
|
|
|
|
results = await asyncio.gather( |
|
|
self.collect_and_store_prices(), |
|
|
self.collect_and_store_news(), |
|
|
self.collect_and_store_sentiment(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
return { |
|
|
"prices": results[0] if not isinstance(results[0], Exception) else {"error": str(results[0])}, |
|
|
"news": results[1] if not isinstance(results[1], Exception) else {"error": str(results[1])}, |
|
|
"sentiment": results[2] if not isinstance(results[2], Exception) else {"error": str(results[2])}, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
async def price_collection_loop(self): |
|
|
"""حلقه جمعآوری مستمر قیمتها""" |
|
|
while self.is_running: |
|
|
try: |
|
|
await self.collect_and_store_prices() |
|
|
await asyncio.sleep(self.intervals['prices']) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in price collection loop: {e}") |
|
|
await asyncio.sleep(60) |
|
|
|
|
|
async def news_collection_loop(self): |
|
|
"""حلقه جمعآوری مستمر اخبار""" |
|
|
while self.is_running: |
|
|
try: |
|
|
await self.collect_and_store_news() |
|
|
await asyncio.sleep(self.intervals['news']) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in news collection loop: {e}") |
|
|
await asyncio.sleep(300) |
|
|
|
|
|
async def sentiment_collection_loop(self): |
|
|
"""حلقه جمعآوری مستمر احساسات""" |
|
|
while self.is_running: |
|
|
try: |
|
|
await self.collect_and_store_sentiment() |
|
|
await asyncio.sleep(self.intervals['sentiment']) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in sentiment collection loop: {e}") |
|
|
await asyncio.sleep(180) |
|
|
|
|
|
async def start_background_collection(self): |
|
|
""" |
|
|
شروع جمعآوری پسزمینه |
|
|
Start continuous background data collection |
|
|
""" |
|
|
logger.info("🚀 Starting background data collection...") |
|
|
|
|
|
self.is_running = True |
|
|
|
|
|
|
|
|
self.collection_tasks = [ |
|
|
asyncio.create_task(self.price_collection_loop()), |
|
|
asyncio.create_task(self.news_collection_loop()), |
|
|
asyncio.create_task(self.sentiment_collection_loop()), |
|
|
] |
|
|
|
|
|
logger.info("✅ Background collection started!") |
|
|
logger.info(f" Prices: every {self.intervals['prices']}s") |
|
|
logger.info(f" News: every {self.intervals['news']}s") |
|
|
logger.info(f" Sentiment: every {self.intervals['sentiment']}s") |
|
|
|
|
|
async def stop_background_collection(self): |
|
|
"""توقف جمعآوری پسزمینه""" |
|
|
logger.info("🛑 Stopping background data collection...") |
|
|
|
|
|
self.is_running = False |
|
|
|
|
|
|
|
|
for task in self.collection_tasks: |
|
|
task.cancel() |
|
|
|
|
|
|
|
|
await asyncio.gather(*self.collection_tasks, return_exceptions=True) |
|
|
|
|
|
logger.info("✅ Background collection stopped!") |
|
|
|
|
|
def get_collection_status(self) -> Dict[str, Any]: |
|
|
"""دریافت وضعیت جمعآوری""" |
|
|
return { |
|
|
"is_running": self.is_running, |
|
|
"last_collection": { |
|
|
k: v.isoformat() if v else None |
|
|
for k, v in self.last_collection.items() |
|
|
}, |
|
|
"intervals": self.intervals, |
|
|
"database_stats": self.db.get_statistics(), |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_orchestrator = None |
|
|
|
|
|
def get_orchestrator() -> DataCollectionOrchestrator: |
|
|
"""دریافت instance هماهنگکننده""" |
|
|
global _orchestrator |
|
|
if _orchestrator is None: |
|
|
_orchestrator = DataCollectionOrchestrator() |
|
|
return _orchestrator |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Test the orchestrator""" |
|
|
print("\n" + "="*70) |
|
|
print("🧪 Testing Data Collection Orchestrator") |
|
|
print("="*70) |
|
|
|
|
|
orchestrator = get_orchestrator() |
|
|
|
|
|
|
|
|
print("\n1️⃣ Testing Single Collection Cycle...") |
|
|
results = await orchestrator.collect_all_data_once() |
|
|
|
|
|
print("\n📊 Results:") |
|
|
print(f" Prices: {results['prices'].get('prices_saved', 0)} saved") |
|
|
print(f" News: {results['news'].get('news_saved', 0)} saved") |
|
|
print(f" Sentiment: {results['sentiment'].get('success', False)}") |
|
|
|
|
|
|
|
|
print("\n2️⃣ Database Statistics:") |
|
|
stats = orchestrator.get_collection_status() |
|
|
print(f" Database size: {stats['database_stats'].get('database_size', 0):,} bytes") |
|
|
print(f" Prices: {stats['database_stats'].get('prices_count', 0)}") |
|
|
print(f" News: {stats['database_stats'].get('news_count', 0)}") |
|
|
print(f" AI Analysis: {stats['database_stats'].get('ai_analysis_count', 0)}") |
|
|
|
|
|
print("\n✅ Orchestrator test complete!") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|