Really-amin's picture
Upload 317 files
eebf5c4 verified
#!/usr/bin/env python3
"""
هماهنگ‌کننده جمع‌آوری داده
Data Collection Orchestrator - Manages all collectors
"""
import asyncio
import sys
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from crypto_data_bank.database import get_db
from crypto_data_bank.collectors.free_price_collector import FreePriceCollector
from crypto_data_bank.collectors.rss_news_collector import RSSNewsCollector
from crypto_data_bank.collectors.sentiment_collector import SentimentCollector
from crypto_data_bank.ai.huggingface_models import get_analyzer
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DataCollectionOrchestrator:
"""
هماهنگ‌کننده اصلی جمع‌آوری داده
Main orchestrator for data collection from all FREE sources
"""
def __init__(self):
self.db = get_db()
self.price_collector = FreePriceCollector()
self.news_collector = RSSNewsCollector()
self.sentiment_collector = SentimentCollector()
self.ai_analyzer = get_analyzer()
self.collection_tasks = []
self.is_running = False
# Collection intervals (in seconds)
self.intervals = {
'prices': 60, # Every 1 minute
'news': 300, # Every 5 minutes
'sentiment': 180, # Every 3 minutes
}
self.last_collection = {
'prices': None,
'news': None,
'sentiment': None,
}
async def collect_and_store_prices(self):
"""جمع‌آوری و ذخیره قیمت‌ها"""
try:
logger.info("💰 Collecting prices from FREE sources...")
# Collect from all free sources
all_prices = await self.price_collector.collect_all_free_sources()
# Aggregate prices
aggregated = self.price_collector.aggregate_prices(all_prices)
# Save to database
saved_count = 0
for price_data in aggregated:
try:
self.db.save_price(
symbol=price_data['symbol'],
price_data=price_data,
source='free_aggregated'
)
saved_count += 1
except Exception as e:
logger.error(f"Error saving price for {price_data.get('symbol')}: {e}")
self.last_collection['prices'] = datetime.now()
logger.info(f"✅ Saved {saved_count}/{len(aggregated)} prices to database")
return {
"success": True,
"prices_collected": len(aggregated),
"prices_saved": saved_count,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ Error collecting prices: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def collect_and_store_news(self):
"""جمع‌آوری و ذخیره اخبار"""
try:
logger.info("📰 Collecting news from FREE RSS feeds...")
# Collect from all RSS feeds
all_news = await self.news_collector.collect_all_rss_feeds()
# Deduplicate
unique_news = self.news_collector.deduplicate_news(all_news)
# Analyze with AI (if available)
if hasattr(self.ai_analyzer, 'analyze_news_batch'):
logger.info("🤖 Analyzing news with AI...")
analyzed_news = await self.ai_analyzer.analyze_news_batch(unique_news[:50])
else:
analyzed_news = unique_news
# Save to database
saved_count = 0
for news_item in analyzed_news:
try:
# Add AI sentiment if available
if 'ai_sentiment' in news_item:
news_item['sentiment'] = news_item['ai_confidence']
self.db.save_news(news_item)
saved_count += 1
except Exception as e:
logger.error(f"Error saving news: {e}")
self.last_collection['news'] = datetime.now()
logger.info(f"✅ Saved {saved_count}/{len(analyzed_news)} news items to database")
# Store AI analysis if available
if analyzed_news and 'ai_sentiment' in analyzed_news[0]:
try:
# Get trending coins from news
trending = self.news_collector.get_trending_coins(analyzed_news)
# Save AI analysis for trending coins
for trend in trending[:10]:
symbol = trend['coin']
symbol_news = [n for n in analyzed_news if symbol in n.get('coins', [])]
if symbol_news:
agg_sentiment = await self.ai_analyzer.calculate_aggregated_sentiment(
symbol_news,
symbol
)
self.db.save_ai_analysis({
'symbol': symbol,
'analysis_type': 'news_sentiment',
'model_used': 'finbert',
'input_data': {
'news_count': len(symbol_news),
'mentions': trend['mentions']
},
'output_data': agg_sentiment,
'confidence': agg_sentiment.get('confidence', 0.0)
})
logger.info(f"✅ Saved AI analysis for {len(trending[:10])} trending coins")
except Exception as e:
logger.error(f"Error saving AI analysis: {e}")
return {
"success": True,
"news_collected": len(unique_news),
"news_saved": saved_count,
"ai_analyzed": 'ai_sentiment' in analyzed_news[0] if analyzed_news else False,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ Error collecting news: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def collect_and_store_sentiment(self):
"""جمع‌آوری و ذخیره احساسات بازار"""
try:
logger.info("😊 Collecting market sentiment from FREE sources...")
# Collect all sentiment data
sentiment_data = await self.sentiment_collector.collect_all_sentiment_data()
# Save overall sentiment
if sentiment_data.get('overall_sentiment'):
self.db.save_sentiment(
sentiment_data['overall_sentiment'],
source='free_aggregated'
)
self.last_collection['sentiment'] = datetime.now()
logger.info(f"✅ Saved market sentiment: {sentiment_data['overall_sentiment']['overall_sentiment']}")
return {
"success": True,
"sentiment": sentiment_data['overall_sentiment'],
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ Error collecting sentiment: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def collect_all_data_once(self) -> Dict[str, Any]:
"""
جمع‌آوری همه داده‌ها یک بار
Collect all data once (prices, news, sentiment)
"""
logger.info("🚀 Starting full data collection cycle...")
results = await asyncio.gather(
self.collect_and_store_prices(),
self.collect_and_store_news(),
self.collect_and_store_sentiment(),
return_exceptions=True
)
return {
"prices": results[0] if not isinstance(results[0], Exception) else {"error": str(results[0])},
"news": results[1] if not isinstance(results[1], Exception) else {"error": str(results[1])},
"sentiment": results[2] if not isinstance(results[2], Exception) else {"error": str(results[2])},
"timestamp": datetime.now().isoformat()
}
async def price_collection_loop(self):
"""حلقه جمع‌آوری مستمر قیمت‌ها"""
while self.is_running:
try:
await self.collect_and_store_prices()
await asyncio.sleep(self.intervals['prices'])
except Exception as e:
logger.error(f"Error in price collection loop: {e}")
await asyncio.sleep(60) # Wait 1 minute on error
async def news_collection_loop(self):
"""حلقه جمع‌آوری مستمر اخبار"""
while self.is_running:
try:
await self.collect_and_store_news()
await asyncio.sleep(self.intervals['news'])
except Exception as e:
logger.error(f"Error in news collection loop: {e}")
await asyncio.sleep(300) # Wait 5 minutes on error
async def sentiment_collection_loop(self):
"""حلقه جمع‌آوری مستمر احساسات"""
while self.is_running:
try:
await self.collect_and_store_sentiment()
await asyncio.sleep(self.intervals['sentiment'])
except Exception as e:
logger.error(f"Error in sentiment collection loop: {e}")
await asyncio.sleep(180) # Wait 3 minutes on error
async def start_background_collection(self):
"""
شروع جمع‌آوری پس‌زمینه
Start continuous background data collection
"""
logger.info("🚀 Starting background data collection...")
self.is_running = True
# Start all collection loops
self.collection_tasks = [
asyncio.create_task(self.price_collection_loop()),
asyncio.create_task(self.news_collection_loop()),
asyncio.create_task(self.sentiment_collection_loop()),
]
logger.info("✅ Background collection started!")
logger.info(f" Prices: every {self.intervals['prices']}s")
logger.info(f" News: every {self.intervals['news']}s")
logger.info(f" Sentiment: every {self.intervals['sentiment']}s")
async def stop_background_collection(self):
"""توقف جمع‌آوری پس‌زمینه"""
logger.info("🛑 Stopping background data collection...")
self.is_running = False
# Cancel all tasks
for task in self.collection_tasks:
task.cancel()
# Wait for tasks to complete
await asyncio.gather(*self.collection_tasks, return_exceptions=True)
logger.info("✅ Background collection stopped!")
def get_collection_status(self) -> Dict[str, Any]:
"""دریافت وضعیت جمع‌آوری"""
return {
"is_running": self.is_running,
"last_collection": {
k: v.isoformat() if v else None
for k, v in self.last_collection.items()
},
"intervals": self.intervals,
"database_stats": self.db.get_statistics(),
"timestamp": datetime.now().isoformat()
}
# Singleton instance
_orchestrator = None
def get_orchestrator() -> DataCollectionOrchestrator:
"""دریافت instance هماهنگ‌کننده"""
global _orchestrator
if _orchestrator is None:
_orchestrator = DataCollectionOrchestrator()
return _orchestrator
async def main():
"""Test the orchestrator"""
print("\n" + "="*70)
print("🧪 Testing Data Collection Orchestrator")
print("="*70)
orchestrator = get_orchestrator()
# Test single collection cycle
print("\n1️⃣ Testing Single Collection Cycle...")
results = await orchestrator.collect_all_data_once()
print("\n📊 Results:")
print(f" Prices: {results['prices'].get('prices_saved', 0)} saved")
print(f" News: {results['news'].get('news_saved', 0)} saved")
print(f" Sentiment: {results['sentiment'].get('success', False)}")
# Show database stats
print("\n2️⃣ Database Statistics:")
stats = orchestrator.get_collection_status()
print(f" Database size: {stats['database_stats'].get('database_size', 0):,} bytes")
print(f" Prices: {stats['database_stats'].get('prices_count', 0)}")
print(f" News: {stats['database_stats'].get('news_count', 0)}")
print(f" AI Analysis: {stats['database_stats'].get('ai_analysis_count', 0)}")
print("\n✅ Orchestrator test complete!")
if __name__ == "__main__":
asyncio.run(main())