# Cryptocurrency Data Collectors Comprehensive data collection modules for cryptocurrency APIs, blockchain explorers, news sources, sentiment indicators, and on-chain analytics. ## Overview This package provides production-ready collectors for gathering cryptocurrency data from various sources. Each collector is designed with robust error handling, logging, staleness tracking, and standardized output formats. ## Modules ### 1. Market Data (`market_data.py`) Collects cryptocurrency market data from multiple providers. **Providers:** - **CoinGecko** - Free API for BTC, ETH, BNB prices with market cap and volume - **CoinMarketCap** - Professional market data with API key - **Binance** - Real-time ticker data from Binance exchange **Functions:** ```python from collectors.market_data import ( get_coingecko_simple_price, get_coinmarketcap_quotes, get_binance_ticker, collect_market_data # Collects from all sources ) # Collect from all market data sources results = await collect_market_data() ``` **Features:** - Concurrent data collection - Price tracking with volume and market cap - 24-hour change percentages - Timestamp extraction for staleness calculation ### 2. Blockchain Explorers (`explorers.py`) Collects data from blockchain explorers and network statistics. **Providers:** - **Etherscan** - Ethereum gas prices and network stats - **BscScan** - BNB prices and BSC network data - **TronScan** - TRON network statistics **Functions:** ```python from collectors.explorers import ( get_etherscan_gas_price, get_bscscan_bnb_price, get_tronscan_stats, collect_explorer_data # Collects from all sources ) # Collect from all explorers results = await collect_explorer_data() ``` **Features:** - Real-time gas price tracking - Network health monitoring - API key management - Rate limit handling ### 3. News Aggregation (`news.py`) Collects cryptocurrency news from multiple sources. **Providers:** - **CryptoPanic** - Cryptocurrency news aggregator with sentiment - **NewsAPI** - General news with crypto filtering **Functions:** ```python from collectors.news import ( get_cryptopanic_posts, get_newsapi_headlines, collect_news_data # Collects from all sources ) # Collect from all news sources results = await collect_news_data() ``` **Features:** - News post aggregation - Article timestamps for freshness tracking - Article count reporting - Content filtering ### 4. Sentiment Analysis (`sentiment.py`) Collects cryptocurrency market sentiment data. **Providers:** - **Alternative.me** - Fear & Greed Index (0-100 scale) **Functions:** ```python from collectors.sentiment import ( get_fear_greed_index, collect_sentiment_data # Collects from all sources ) # Collect sentiment data results = await collect_sentiment_data() ``` **Features:** - Market sentiment indicator (Fear/Greed) - Historical sentiment tracking - Classification (Extreme Fear, Fear, Neutral, Greed, Extreme Greed) ### 5. On-Chain Analytics (`onchain.py`) Placeholder implementations for on-chain data sources. **Providers (Placeholder):** - **The Graph** - GraphQL-based blockchain data - **Blockchair** - Blockchain explorer and statistics - **Glassnode** - Advanced on-chain metrics **Functions:** ```python from collectors.onchain import ( get_the_graph_data, get_blockchair_data, get_glassnode_metrics, collect_onchain_data # Collects from all sources ) # Collect on-chain data (placeholder) results = await collect_onchain_data() ``` **Planned Features:** - DEX volume and liquidity tracking - Token holder analytics - NUPL, SOPR, and other on-chain metrics - Exchange flow monitoring - Whale transaction tracking ## Standard Output Format All collectors return a standardized dictionary format: ```python { "provider": str, # Provider name (e.g., "CoinGecko") "category": str, # Category (e.g., "market_data") "data": dict/list/None, # Raw API response data "timestamp": str, # Collection timestamp (ISO format) "data_timestamp": str/None, # Data timestamp from API (ISO format) "staleness_minutes": float/None, # Age of data in minutes "success": bool, # Whether collection succeeded "error": str/None, # Error message if failed "error_type": str/None, # Error classification "response_time_ms": float # API response time } ``` ## Common Features All collectors implement: 1. **Error Handling** - Graceful failure with detailed error messages - Exception catching and logging - API-specific error parsing 2. **Logging** - Structured JSON logging - Request/response logging - Error logging with context 3. **Staleness Tracking** - Extracts timestamps from API responses - Calculates data age in minutes - Handles missing timestamps 4. **Rate Limiting** - Respects provider rate limits - Exponential backoff on failures - Rate limit error detection 5. **Retry Logic** - Automatic retries on failure - Configurable retry attempts - Timeout handling 6. **API Key Management** - Loads keys from config - Handles missing keys gracefully - API key masking in logs ## Usage Examples ### Basic Usage ```python import asyncio from collectors import collect_market_data async def main(): results = await collect_market_data() for result in results: if result['success']: print(f"{result['provider']}: Success") print(f" Staleness: {result['staleness_minutes']:.2f}m") else: print(f"{result['provider']}: Failed - {result['error']}") asyncio.run(main()) ``` ### Collecting All Data ```python import asyncio from collectors import ( collect_market_data, collect_explorer_data, collect_news_data, collect_sentiment_data, collect_onchain_data ) async def collect_all(): results = await asyncio.gather( collect_market_data(), collect_explorer_data(), collect_news_data(), collect_sentiment_data(), collect_onchain_data() ) market, explorers, news, sentiment, onchain = results return { "market_data": market, "explorers": explorers, "news": news, "sentiment": sentiment, "onchain": onchain } all_data = asyncio.run(collect_all()) ``` ### Individual Collector Usage ```python import asyncio from collectors.market_data import get_coingecko_simple_price async def get_prices(): result = await get_coingecko_simple_price() if result['success']: data = result['data'] print(f"Bitcoin: ${data['bitcoin']['usd']}") print(f"Ethereum: ${data['ethereum']['usd']}") print(f"BNB: ${data['binancecoin']['usd']}") asyncio.run(get_prices()) ``` ## Demo Script Run the comprehensive demo to test all collectors: ```bash python collectors/demo_collectors.py ``` This will: - Execute all collectors concurrently - Display detailed results for each category - Show overall statistics - Save results to a JSON file ## Configuration Collectors use the central configuration system from `config.py`: ```python from config import config # Get provider configuration provider = config.get_provider('CoinGecko') # Get API key api_key = config.get_api_key('coinmarketcap') # Get providers by category market_providers = config.get_providers_by_category('market_data') ``` ## API Keys API keys are loaded from environment variables: ```bash # Market Data export COINMARKETCAP_KEY_1="your_key_here" export COINMARKETCAP_KEY_2="backup_key" # Blockchain Explorers export ETHERSCAN_KEY_1="your_key_here" export ETHERSCAN_KEY_2="backup_key" export BSCSCAN_KEY="your_key_here" export TRONSCAN_KEY="your_key_here" # News export NEWSAPI_KEY="your_key_here" # Analytics export CRYPTOCOMPARE_KEY="your_key_here" ``` Or use `.env` file with `python-dotenv`: ```env COINMARKETCAP_KEY_1=your_key_here ETHERSCAN_KEY_1=your_key_here BSCSCAN_KEY=your_key_here NEWSAPI_KEY=your_key_here ``` ## Dependencies - `aiohttp` - Async HTTP client - `asyncio` - Async programming - `datetime` - Timestamp handling - `utils.api_client` - Robust API client with retry logic - `utils.logger` - Structured JSON logging - `config` - Centralized configuration ## Error Handling Collectors handle various error types: - **config_error** - Provider not configured - **missing_api_key** - API key required but not available - **authentication** - API key invalid or expired - **rate_limit** - Rate limit exceeded - **timeout** - Request timeout - **server_error** - API server error (5xx) - **network_error** - Network connectivity issue - **api_error** - API-specific error - **exception** - Unexpected Python exception ## Extending Collectors To add a new collector: 1. Create a new module or add to existing category 2. Implement collector function following the standard pattern 3. Use `get_client()` for API requests 4. Extract and calculate staleness from timestamps 5. Return standardized output format 6. Add to `__init__.py` exports 7. Update this README Example: ```python async def get_new_provider_data() -> Dict[str, Any]: """Fetch data from new provider""" provider = "NewProvider" category = "market_data" endpoint = "/api/v1/data" logger.info(f"Fetching data from {provider}") try: client = get_client() provider_config = config.get_provider(provider) # Make request url = f"{provider_config.endpoint_url}{endpoint}" response = await client.get(url) # Log request log_api_request( logger, provider, endpoint, response.get("response_time_ms", 0), "success" if response["success"] else "error", response.get("status_code") ) if not response["success"]: # Handle error return { "provider": provider, "category": category, "success": False, "error": response.get("error_message") } # Parse data and timestamps data = response["data"] data_timestamp = # extract from response staleness = calculate_staleness_minutes(data_timestamp) return { "provider": provider, "category": category, "data": data, "timestamp": datetime.now(timezone.utc).isoformat(), "data_timestamp": data_timestamp.isoformat(), "staleness_minutes": staleness, "success": True, "error": None, "response_time_ms": response.get("response_time_ms", 0) } except Exception as e: log_error(logger, provider, "exception", str(e), endpoint, exc_info=True) return { "provider": provider, "category": category, "success": False, "error": str(e), "error_type": "exception" } ``` ## Testing Test individual collectors: ```bash # Test market data collector python -m collectors.market_data # Test explorers python -m collectors.explorers # Test news python -m collectors.news # Test sentiment python -m collectors.sentiment # Test on-chain (placeholder) python -m collectors.onchain ``` ## Performance - Collectors run concurrently using `asyncio.gather()` - Typical response times: 100-2000ms per collector - Connection pooling for efficiency - Configurable timeouts - Automatic retry with exponential backoff ## Monitoring All collectors provide metrics for monitoring: - **Success Rate** - Percentage of successful collections - **Response Time** - API response time in milliseconds - **Staleness** - Data age in minutes - **Error Types** - Classification of failures - **Retry Count** - Number of retries needed ## Future Enhancements 1. **On-Chain Implementation** - Complete The Graph integration - Implement Blockchair endpoints - Add Glassnode metrics 2. **Additional Providers** - Messari - DeFiLlama - CoinAPI - Nomics 3. **Advanced Features** - Circuit breaker pattern - Data caching - Webhook notifications - Real-time streaming 4. **Performance** - Redis caching - Database persistence - Rate limit optimization - Parallel processing ## Support For issues or questions: 1. Check the logs for detailed error messages 2. Verify API keys are configured correctly 3. Review provider rate limits 4. Check network connectivity 5. Consult provider documentation ## License Part of the Crypto API Monitoring system.