Cryptocurrency Data Collectors
Comprehensive data collection modules for cryptocurrency APIs, blockchain explorers, news sources, sentiment indicators, and on-chain analytics.
Overview
This package provides production-ready collectors for gathering cryptocurrency data from various sources. Each collector is designed with robust error handling, logging, staleness tracking, and standardized output formats.
Modules
1. Market Data (market_data.py)
Collects cryptocurrency market data from multiple providers.
Providers:
- CoinGecko - Free API for BTC, ETH, BNB prices with market cap and volume
- CoinMarketCap - Professional market data with API key
- Binance - Real-time ticker data from Binance exchange
Functions:
from collectors.market_data import (
get_coingecko_simple_price,
get_coinmarketcap_quotes,
get_binance_ticker,
collect_market_data # Collects from all sources
)
# Collect from all market data sources
results = await collect_market_data()
Features:
- Concurrent data collection
- Price tracking with volume and market cap
- 24-hour change percentages
- Timestamp extraction for staleness calculation
2. Blockchain Explorers (explorers.py)
Collects data from blockchain explorers and network statistics.
Providers:
- Etherscan - Ethereum gas prices and network stats
- BscScan - BNB prices and BSC network data
- TronScan - TRON network statistics
Functions:
from collectors.explorers import (
get_etherscan_gas_price,
get_bscscan_bnb_price,
get_tronscan_stats,
collect_explorer_data # Collects from all sources
)
# Collect from all explorers
results = await collect_explorer_data()
Features:
- Real-time gas price tracking
- Network health monitoring
- API key management
- Rate limit handling
3. News Aggregation (news.py)
Collects cryptocurrency news from multiple sources.
Providers:
- CryptoPanic - Cryptocurrency news aggregator with sentiment
- NewsAPI - General news with crypto filtering
Functions:
from collectors.news import (
get_cryptopanic_posts,
get_newsapi_headlines,
collect_news_data # Collects from all sources
)
# Collect from all news sources
results = await collect_news_data()
Features:
- News post aggregation
- Article timestamps for freshness tracking
- Article count reporting
- Content filtering
4. Sentiment Analysis (sentiment.py)
Collects cryptocurrency market sentiment data.
Providers:
- Alternative.me - Fear & Greed Index (0-100 scale)
Functions:
from collectors.sentiment import (
get_fear_greed_index,
collect_sentiment_data # Collects from all sources
)
# Collect sentiment data
results = await collect_sentiment_data()
Features:
- Market sentiment indicator (Fear/Greed)
- Historical sentiment tracking
- Classification (Extreme Fear, Fear, Neutral, Greed, Extreme Greed)
5. On-Chain Analytics (onchain.py)
Placeholder implementations for on-chain data sources.
Providers (Placeholder):
- The Graph - GraphQL-based blockchain data
- Blockchair - Blockchain explorer and statistics
- Glassnode - Advanced on-chain metrics
Functions:
from collectors.onchain import (
get_the_graph_data,
get_blockchair_data,
get_glassnode_metrics,
collect_onchain_data # Collects from all sources
)
# Collect on-chain data (placeholder)
results = await collect_onchain_data()
Planned Features:
- DEX volume and liquidity tracking
- Token holder analytics
- NUPL, SOPR, and other on-chain metrics
- Exchange flow monitoring
- Whale transaction tracking
Standard Output Format
All collectors return a standardized dictionary format:
{
"provider": str, # Provider name (e.g., "CoinGecko")
"category": str, # Category (e.g., "market_data")
"data": dict/list/None, # Raw API response data
"timestamp": str, # Collection timestamp (ISO format)
"data_timestamp": str/None, # Data timestamp from API (ISO format)
"staleness_minutes": float/None, # Age of data in minutes
"success": bool, # Whether collection succeeded
"error": str/None, # Error message if failed
"error_type": str/None, # Error classification
"response_time_ms": float # API response time
}
Common Features
All collectors implement:
Error Handling
- Graceful failure with detailed error messages
- Exception catching and logging
- API-specific error parsing
Logging
- Structured JSON logging
- Request/response logging
- Error logging with context
Staleness Tracking
- Extracts timestamps from API responses
- Calculates data age in minutes
- Handles missing timestamps
Rate Limiting
- Respects provider rate limits
- Exponential backoff on failures
- Rate limit error detection
Retry Logic
- Automatic retries on failure
- Configurable retry attempts
- Timeout handling
API Key Management
- Loads keys from config
- Handles missing keys gracefully
- API key masking in logs
Usage Examples
Basic Usage
import asyncio
from collectors import collect_market_data
async def main():
results = await collect_market_data()
for result in results:
if result['success']:
print(f"{result['provider']}: Success")
print(f" Staleness: {result['staleness_minutes']:.2f}m")
else:
print(f"{result['provider']}: Failed - {result['error']}")
asyncio.run(main())
Collecting All Data
import asyncio
from collectors import (
collect_market_data,
collect_explorer_data,
collect_news_data,
collect_sentiment_data,
collect_onchain_data
)
async def collect_all():
results = await asyncio.gather(
collect_market_data(),
collect_explorer_data(),
collect_news_data(),
collect_sentiment_data(),
collect_onchain_data()
)
market, explorers, news, sentiment, onchain = results
return {
"market_data": market,
"explorers": explorers,
"news": news,
"sentiment": sentiment,
"onchain": onchain
}
all_data = asyncio.run(collect_all())
Individual Collector Usage
import asyncio
from collectors.market_data import get_coingecko_simple_price
async def get_prices():
result = await get_coingecko_simple_price()
if result['success']:
data = result['data']
print(f"Bitcoin: ${data['bitcoin']['usd']}")
print(f"Ethereum: ${data['ethereum']['usd']}")
print(f"BNB: ${data['binancecoin']['usd']}")
asyncio.run(get_prices())
Demo Script
Run the comprehensive demo to test all collectors:
python collectors/demo_collectors.py
This will:
- Execute all collectors concurrently
- Display detailed results for each category
- Show overall statistics
- Save results to a JSON file
Configuration
Collectors use the central configuration system from config.py:
from config import config
# Get provider configuration
provider = config.get_provider('CoinGecko')
# Get API key
api_key = config.get_api_key('coinmarketcap')
# Get providers by category
market_providers = config.get_providers_by_category('market_data')
API Keys
API keys are loaded from environment variables:
# Market Data
export COINMARKETCAP_KEY_1="your_key_here"
export COINMARKETCAP_KEY_2="backup_key"
# Blockchain Explorers
export ETHERSCAN_KEY_1="your_key_here"
export ETHERSCAN_KEY_2="backup_key"
export BSCSCAN_KEY="your_key_here"
export TRONSCAN_KEY="your_key_here"
# News
export NEWSAPI_KEY="your_key_here"
# Analytics
export CRYPTOCOMPARE_KEY="your_key_here"
Or use .env file with python-dotenv:
COINMARKETCAP_KEY_1=your_key_here
ETHERSCAN_KEY_1=your_key_here
BSCSCAN_KEY=your_key_here
NEWSAPI_KEY=your_key_here
Dependencies
aiohttp- Async HTTP clientasyncio- Async programmingdatetime- Timestamp handlingutils.api_client- Robust API client with retry logicutils.logger- Structured JSON loggingconfig- Centralized configuration
Error Handling
Collectors handle various error types:
- config_error - Provider not configured
- missing_api_key - API key required but not available
- authentication - API key invalid or expired
- rate_limit - Rate limit exceeded
- timeout - Request timeout
- server_error - API server error (5xx)
- network_error - Network connectivity issue
- api_error - API-specific error
- exception - Unexpected Python exception
Extending Collectors
To add a new collector:
- Create a new module or add to existing category
- Implement collector function following the standard pattern
- Use
get_client()for API requests - Extract and calculate staleness from timestamps
- Return standardized output format
- Add to
__init__.pyexports - Update this README
Example:
async def get_new_provider_data() -> Dict[str, Any]:
"""Fetch data from new provider"""
provider = "NewProvider"
category = "market_data"
endpoint = "/api/v1/data"
logger.info(f"Fetching data from {provider}")
try:
client = get_client()
provider_config = config.get_provider(provider)
# Make request
url = f"{provider_config.endpoint_url}{endpoint}"
response = await client.get(url)
# Log request
log_api_request(
logger, provider, endpoint,
response.get("response_time_ms", 0),
"success" if response["success"] else "error",
response.get("status_code")
)
if not response["success"]:
# Handle error
return {
"provider": provider,
"category": category,
"success": False,
"error": response.get("error_message")
}
# Parse data and timestamps
data = response["data"]
data_timestamp = # extract from response
staleness = calculate_staleness_minutes(data_timestamp)
return {
"provider": provider,
"category": category,
"data": data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data_timestamp": data_timestamp.isoformat(),
"staleness_minutes": staleness,
"success": True,
"error": None,
"response_time_ms": response.get("response_time_ms", 0)
}
except Exception as e:
log_error(logger, provider, "exception", str(e), endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"success": False,
"error": str(e),
"error_type": "exception"
}
Testing
Test individual collectors:
# Test market data collector
python -m collectors.market_data
# Test explorers
python -m collectors.explorers
# Test news
python -m collectors.news
# Test sentiment
python -m collectors.sentiment
# Test on-chain (placeholder)
python -m collectors.onchain
Performance
- Collectors run concurrently using
asyncio.gather() - Typical response times: 100-2000ms per collector
- Connection pooling for efficiency
- Configurable timeouts
- Automatic retry with exponential backoff
Monitoring
All collectors provide metrics for monitoring:
- Success Rate - Percentage of successful collections
- Response Time - API response time in milliseconds
- Staleness - Data age in minutes
- Error Types - Classification of failures
- Retry Count - Number of retries needed
Future Enhancements
On-Chain Implementation
- Complete The Graph integration
- Implement Blockchair endpoints
- Add Glassnode metrics
Additional Providers
- Messari
- DeFiLlama
- CoinAPI
- Nomics
Advanced Features
- Circuit breaker pattern
- Data caching
- Webhook notifications
- Real-time streaming
Performance
- Redis caching
- Database persistence
- Rate limit optimization
- Parallel processing
Support
For issues or questions:
- Check the logs for detailed error messages
- Verify API keys are configured correctly
- Review provider rate limits
- Check network connectivity
- Consult provider documentation
License
Part of the Crypto API Monitoring system.