|
|
""" |
|
|
Master Collector - Aggregates all data sources |
|
|
Unified interface to collect data from all available collectors |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import os |
|
|
from datetime import datetime, timezone |
|
|
from typing import Dict, List, Optional, Any |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
|
|
|
from collectors.market_data import collect_market_data |
|
|
from collectors.market_data_extended import collect_extended_market_data |
|
|
from collectors.explorers import collect_explorer_data |
|
|
from collectors.news import collect_news |
|
|
from collectors.news_extended import collect_extended_news |
|
|
from collectors.sentiment import collect_sentiment |
|
|
from collectors.sentiment_extended import collect_extended_sentiment_data |
|
|
from collectors.onchain import collect_onchain_data |
|
|
from collectors.rpc_nodes import collect_rpc_data |
|
|
from collectors.whale_tracking import collect_whale_tracking_data |
|
|
|
|
|
|
|
|
from collectors.data_persistence import data_persistence |
|
|
|
|
|
logger = setup_logger("master_collector") |
|
|
|
|
|
|
|
|
class DataSourceCollector: |
|
|
""" |
|
|
Master collector that aggregates all data sources |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the master collector""" |
|
|
self.api_keys = self._load_api_keys() |
|
|
logger.info("Master Collector initialized") |
|
|
|
|
|
def _load_api_keys(self) -> Dict[str, Optional[str]]: |
|
|
""" |
|
|
Load API keys from environment variables |
|
|
|
|
|
Returns: |
|
|
Dict of API keys |
|
|
""" |
|
|
return { |
|
|
|
|
|
"coinmarketcap": os.getenv("COINMARKETCAP_KEY_1"), |
|
|
"messari": os.getenv("MESSARI_API_KEY"), |
|
|
"cryptocompare": os.getenv("CRYPTOCOMPARE_KEY"), |
|
|
|
|
|
|
|
|
"etherscan": os.getenv("ETHERSCAN_KEY_1"), |
|
|
"bscscan": os.getenv("BSCSCAN_KEY"), |
|
|
"tronscan": os.getenv("TRONSCAN_KEY"), |
|
|
|
|
|
|
|
|
"newsapi": os.getenv("NEWSAPI_KEY"), |
|
|
|
|
|
|
|
|
"infura": os.getenv("INFURA_API_KEY"), |
|
|
"alchemy": os.getenv("ALCHEMY_API_KEY"), |
|
|
|
|
|
|
|
|
"whalealert": os.getenv("WHALEALERT_API_KEY"), |
|
|
|
|
|
|
|
|
"huggingface": os.getenv("HUGGINGFACE_TOKEN"), |
|
|
} |
|
|
|
|
|
async def collect_all_market_data(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from all market data sources |
|
|
|
|
|
Returns: |
|
|
List of market data results |
|
|
""" |
|
|
logger.info("Collecting all market data...") |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
core_results = await collect_market_data() |
|
|
results.extend(core_results) |
|
|
|
|
|
|
|
|
extended_results = await collect_extended_market_data( |
|
|
messari_key=self.api_keys.get("messari") |
|
|
) |
|
|
results.extend(extended_results) |
|
|
|
|
|
logger.info(f"Market data collection complete: {len(results)} results") |
|
|
return results |
|
|
|
|
|
async def collect_all_blockchain_data(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from all blockchain sources (explorers + RPC + on-chain) |
|
|
|
|
|
Returns: |
|
|
List of blockchain data results |
|
|
""" |
|
|
logger.info("Collecting all blockchain data...") |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
explorer_results = await collect_explorer_data() |
|
|
results.extend(explorer_results) |
|
|
|
|
|
|
|
|
rpc_results = await collect_rpc_data( |
|
|
infura_key=self.api_keys.get("infura"), |
|
|
alchemy_key=self.api_keys.get("alchemy") |
|
|
) |
|
|
results.extend(rpc_results) |
|
|
|
|
|
|
|
|
onchain_results = await collect_onchain_data() |
|
|
results.extend(onchain_results) |
|
|
|
|
|
logger.info(f"Blockchain data collection complete: {len(results)} results") |
|
|
return results |
|
|
|
|
|
async def collect_all_news(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from all news sources |
|
|
|
|
|
Returns: |
|
|
List of news results |
|
|
""" |
|
|
logger.info("Collecting all news...") |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
core_results = await collect_news() |
|
|
results.extend(core_results) |
|
|
|
|
|
|
|
|
extended_results = await collect_extended_news() |
|
|
results.extend(extended_results) |
|
|
|
|
|
logger.info(f"News collection complete: {len(results)} results") |
|
|
return results |
|
|
|
|
|
async def collect_all_sentiment(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from all sentiment sources |
|
|
|
|
|
Returns: |
|
|
List of sentiment results |
|
|
""" |
|
|
logger.info("Collecting all sentiment data...") |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
core_results = await collect_sentiment() |
|
|
results.extend(core_results) |
|
|
|
|
|
|
|
|
extended_results = await collect_extended_sentiment_data() |
|
|
results.extend(extended_results) |
|
|
|
|
|
logger.info(f"Sentiment collection complete: {len(results)} results") |
|
|
return results |
|
|
|
|
|
async def collect_whale_tracking(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect whale tracking data |
|
|
|
|
|
Returns: |
|
|
List of whale tracking results |
|
|
""" |
|
|
logger.info("Collecting whale tracking data...") |
|
|
|
|
|
results = await collect_whale_tracking_data( |
|
|
whalealert_key=self.api_keys.get("whalealert") |
|
|
) |
|
|
|
|
|
logger.info(f"Whale tracking collection complete: {len(results)} results") |
|
|
return results |
|
|
|
|
|
async def collect_all_data(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Collect data from ALL available sources in parallel |
|
|
|
|
|
Returns: |
|
|
Dict with categorized results and statistics |
|
|
""" |
|
|
logger.info("=" * 60) |
|
|
logger.info("Starting MASTER data collection from ALL sources") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
start_time = datetime.now(timezone.utc) |
|
|
|
|
|
|
|
|
market_data, blockchain_data, news_data, sentiment_data, whale_data = await asyncio.gather( |
|
|
self.collect_all_market_data(), |
|
|
self.collect_all_blockchain_data(), |
|
|
self.collect_all_news(), |
|
|
self.collect_all_sentiment(), |
|
|
self.collect_whale_tracking(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
if isinstance(market_data, Exception): |
|
|
logger.error(f"Market data collection failed: {str(market_data)}") |
|
|
market_data = [] |
|
|
|
|
|
if isinstance(blockchain_data, Exception): |
|
|
logger.error(f"Blockchain data collection failed: {str(blockchain_data)}") |
|
|
blockchain_data = [] |
|
|
|
|
|
if isinstance(news_data, Exception): |
|
|
logger.error(f"News collection failed: {str(news_data)}") |
|
|
news_data = [] |
|
|
|
|
|
if isinstance(sentiment_data, Exception): |
|
|
logger.error(f"Sentiment collection failed: {str(sentiment_data)}") |
|
|
sentiment_data = [] |
|
|
|
|
|
if isinstance(whale_data, Exception): |
|
|
logger.error(f"Whale tracking collection failed: {str(whale_data)}") |
|
|
whale_data = [] |
|
|
|
|
|
|
|
|
end_time = datetime.now(timezone.utc) |
|
|
duration = (end_time - start_time).total_seconds() |
|
|
|
|
|
total_sources = ( |
|
|
len(market_data) + |
|
|
len(blockchain_data) + |
|
|
len(news_data) + |
|
|
len(sentiment_data) + |
|
|
len(whale_data) |
|
|
) |
|
|
|
|
|
successful_sources = sum([ |
|
|
sum(1 for r in market_data if r.get("success", False)), |
|
|
sum(1 for r in blockchain_data if r.get("success", False)), |
|
|
sum(1 for r in news_data if r.get("success", False)), |
|
|
sum(1 for r in sentiment_data if r.get("success", False)), |
|
|
sum(1 for r in whale_data if r.get("success", False)) |
|
|
]) |
|
|
|
|
|
placeholder_count = sum([ |
|
|
sum(1 for r in market_data if r.get("is_placeholder", False)), |
|
|
sum(1 for r in blockchain_data if r.get("is_placeholder", False)), |
|
|
sum(1 for r in news_data if r.get("is_placeholder", False)), |
|
|
sum(1 for r in sentiment_data if r.get("is_placeholder", False)), |
|
|
sum(1 for r in whale_data if r.get("is_placeholder", False)) |
|
|
]) |
|
|
|
|
|
|
|
|
results = { |
|
|
"collection_timestamp": start_time.isoformat(), |
|
|
"duration_seconds": round(duration, 2), |
|
|
"statistics": { |
|
|
"total_sources": total_sources, |
|
|
"successful_sources": successful_sources, |
|
|
"failed_sources": total_sources - successful_sources, |
|
|
"placeholder_sources": placeholder_count, |
|
|
"success_rate": round(successful_sources / total_sources * 100, 2) if total_sources > 0 else 0, |
|
|
"categories": { |
|
|
"market_data": { |
|
|
"total": len(market_data), |
|
|
"successful": sum(1 for r in market_data if r.get("success", False)) |
|
|
}, |
|
|
"blockchain": { |
|
|
"total": len(blockchain_data), |
|
|
"successful": sum(1 for r in blockchain_data if r.get("success", False)) |
|
|
}, |
|
|
"news": { |
|
|
"total": len(news_data), |
|
|
"successful": sum(1 for r in news_data if r.get("success", False)) |
|
|
}, |
|
|
"sentiment": { |
|
|
"total": len(sentiment_data), |
|
|
"successful": sum(1 for r in sentiment_data if r.get("success", False)) |
|
|
}, |
|
|
"whale_tracking": { |
|
|
"total": len(whale_data), |
|
|
"successful": sum(1 for r in whale_data if r.get("success", False)) |
|
|
} |
|
|
} |
|
|
}, |
|
|
"data": { |
|
|
"market_data": market_data, |
|
|
"blockchain": blockchain_data, |
|
|
"news": news_data, |
|
|
"sentiment": sentiment_data, |
|
|
"whale_tracking": whale_data |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("=" * 60) |
|
|
logger.info("MASTER COLLECTION COMPLETE") |
|
|
logger.info(f"Duration: {duration:.2f} seconds") |
|
|
logger.info(f"Total Sources: {total_sources}") |
|
|
logger.info(f"Successful: {successful_sources} ({results['statistics']['success_rate']}%)") |
|
|
logger.info(f"Failed: {total_sources - successful_sources}") |
|
|
logger.info(f"Placeholders: {placeholder_count}") |
|
|
logger.info("=" * 60) |
|
|
logger.info("Category Breakdown:") |
|
|
for category, stats in results['statistics']['categories'].items(): |
|
|
logger.info(f" {category}: {stats['successful']}/{stats['total']}") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
|
|
|
try: |
|
|
persistence_stats = data_persistence.save_all_data(results) |
|
|
results['persistence_stats'] = persistence_stats |
|
|
except Exception as e: |
|
|
logger.error(f"Error persisting data to database: {e}", exc_info=True) |
|
|
results['persistence_stats'] = {'error': str(e)} |
|
|
|
|
|
return results |
|
|
|
|
|
async def collect_category(self, category: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from a specific category |
|
|
|
|
|
Args: |
|
|
category: Category name (market_data, blockchain, news, sentiment, whale_tracking) |
|
|
|
|
|
Returns: |
|
|
List of results for the category |
|
|
""" |
|
|
logger.info(f"Collecting data for category: {category}") |
|
|
|
|
|
if category == "market_data": |
|
|
return await self.collect_all_market_data() |
|
|
elif category == "blockchain": |
|
|
return await self.collect_all_blockchain_data() |
|
|
elif category == "news": |
|
|
return await self.collect_all_news() |
|
|
elif category == "sentiment": |
|
|
return await self.collect_all_sentiment() |
|
|
elif category == "whale_tracking": |
|
|
return await self.collect_whale_tracking() |
|
|
else: |
|
|
logger.error(f"Unknown category: {category}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
collector = DataSourceCollector() |
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("CRYPTO DATA SOURCE MASTER COLLECTOR") |
|
|
print("Collecting data from ALL available sources...") |
|
|
print("=" * 80 + "\n") |
|
|
|
|
|
|
|
|
results = await collector.collect_all_data() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("COLLECTION SUMMARY") |
|
|
print("=" * 80) |
|
|
print(f"Duration: {results['duration_seconds']} seconds") |
|
|
print(f"Total Sources: {results['statistics']['total_sources']}") |
|
|
print(f"Successful: {results['statistics']['successful_sources']} " |
|
|
f"({results['statistics']['success_rate']}%)") |
|
|
print(f"Failed: {results['statistics']['failed_sources']}") |
|
|
print(f"Placeholders: {results['statistics']['placeholder_sources']}") |
|
|
print("\n" + "-" * 80) |
|
|
print("CATEGORY BREAKDOWN:") |
|
|
print("-" * 80) |
|
|
|
|
|
for category, stats in results['statistics']['categories'].items(): |
|
|
success_rate = (stats['successful'] / stats['total'] * 100) if stats['total'] > 0 else 0 |
|
|
print(f"{category:20} {stats['successful']:3}/{stats['total']:3} ({success_rate:5.1f}%)") |
|
|
|
|
|
print("=" * 80) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("SAMPLE DATA FROM EACH CATEGORY") |
|
|
print("=" * 80) |
|
|
|
|
|
for category, data_list in results['data'].items(): |
|
|
print(f"\n{category.upper()}:") |
|
|
successful = [d for d in data_list if d.get('success', False)] |
|
|
if successful: |
|
|
sample = successful[0] |
|
|
print(f" Provider: {sample.get('provider', 'N/A')}") |
|
|
print(f" Success: {sample.get('success', False)}") |
|
|
if sample.get('data'): |
|
|
print(f" Data keys: {list(sample.get('data', {}).keys())[:5]}") |
|
|
else: |
|
|
print(" No successful data") |
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
|
|
|
asyncio.run(main()) |
|
|
|