Datasourceforcryptocurrency / collectors /sentiment_extended.py
Really-amin's picture
Upload 136 files
48ae4e0 verified
"""
Extended Sentiment Collectors
Fetches sentiment data from LunarCrush, Santiment, and other sentiment APIs
"""
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from utils.api_client import get_client
from utils.logger import setup_logger, log_api_request, log_error
logger = setup_logger("sentiment_extended_collector")
async def get_lunarcrush_global() -> Dict[str, Any]:
"""
Fetch global market sentiment from LunarCrush
Note: LunarCrush API v3 requires API key
Free tier available with limited requests
Returns:
Dict with provider, category, data, timestamp, success, error
"""
provider = "LunarCrush"
category = "sentiment"
endpoint = "/public/metrics/global"
logger.info(f"Fetching global sentiment from {provider}")
try:
client = get_client()
# LunarCrush public metrics (limited free access)
url = "https://lunarcrush.com/api3/public/metrics/global"
# Make request
response = await client.get(url, timeout=10)
# Log request
log_api_request(
logger,
provider,
endpoint,
response.get("response_time_ms", 0),
"success" if response["success"] else "error",
response.get("status_code")
)
if not response["success"]:
# LunarCrush may require API key, return placeholder
logger.warning(f"{provider} - API requires authentication, returning placeholder")
return {
"provider": provider,
"category": category,
"data": {
"status": "placeholder",
"message": "LunarCrush API requires authentication",
"planned_features": [
"Social media sentiment tracking",
"Galaxy Score (social activity metric)",
"AltRank (relative social dominance)",
"Influencer tracking",
"Social volume and engagement metrics"
]
},
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
# Extract data
data = response["data"]
sentiment_data = None
if isinstance(data, dict):
sentiment_data = {
"social_volume": data.get("social_volume"),
"social_score": data.get("social_score"),
"market_sentiment": data.get("sentiment"),
"timestamp": data.get("timestamp")
}
logger.info(f"{provider} - {endpoint} - Retrieved sentiment data")
return {
"provider": provider,
"category": category,
"data": sentiment_data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"response_time_ms": response.get("response_time_ms", 0)
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"data": {
"status": "placeholder",
"message": f"LunarCrush integration error: {str(e)}"
},
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
async def get_santiment_metrics() -> Dict[str, Any]:
"""
Fetch sentiment metrics from Santiment
Note: Santiment API requires authentication
Provides on-chain, social, and development activity metrics
Returns:
Dict with provider, category, data, timestamp, success, error
"""
provider = "Santiment"
category = "sentiment"
endpoint = "/graphql"
logger.info(f"Fetching sentiment metrics from {provider} (placeholder)")
try:
# Santiment uses GraphQL API and requires authentication
# Placeholder implementation
placeholder_data = {
"status": "placeholder",
"message": "Santiment API requires authentication and GraphQL queries",
"planned_metrics": [
"Social volume and trends",
"Development activity",
"Network growth",
"Exchange flow",
"MVRV ratio",
"Daily active addresses",
"Token age consumed",
"Crowd sentiment"
],
"note": "Requires Santiment API key and SAN tokens for full access"
}
logger.info(f"{provider} - {endpoint} - Placeholder data returned")
return {
"provider": provider,
"category": category,
"data": placeholder_data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": False,
"error": error_msg,
"error_type": "exception"
}
async def get_cryptoquant_sentiment() -> Dict[str, Any]:
"""
Fetch on-chain sentiment from CryptoQuant
Returns:
Dict with provider, category, data, timestamp, success, error
"""
provider = "CryptoQuant"
category = "sentiment"
endpoint = "/sentiment"
logger.info(f"Fetching sentiment from {provider} (placeholder)")
try:
# CryptoQuant API requires authentication
# Placeholder implementation
placeholder_data = {
"status": "placeholder",
"message": "CryptoQuant API requires authentication",
"planned_metrics": [
"Exchange reserves",
"Miner flows",
"Whale transactions",
"Stablecoin supply ratio",
"Funding rates",
"Open interest"
]
}
return {
"provider": provider,
"category": category,
"data": placeholder_data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
except Exception as e:
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": False,
"error": str(e),
"error_type": "exception"
}
async def get_augmento_signals() -> Dict[str, Any]:
"""
Fetch market sentiment signals from Augmento.ai
Returns:
Dict with provider, category, data, timestamp, success, error
"""
provider = "Augmento"
category = "sentiment"
endpoint = "/signals"
logger.info(f"Fetching sentiment signals from {provider} (placeholder)")
try:
# Augmento provides AI-powered crypto sentiment signals
# Requires API key
placeholder_data = {
"status": "placeholder",
"message": "Augmento API requires authentication",
"planned_features": [
"AI-powered sentiment signals",
"Topic extraction from social media",
"Emerging trend detection",
"Sentiment momentum indicators"
]
}
return {
"provider": provider,
"category": category,
"data": placeholder_data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
except Exception as e:
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": False,
"error": str(e),
"error_type": "exception"
}
async def get_thetie_sentiment() -> Dict[str, Any]:
"""
Fetch sentiment data from TheTie.io
Returns:
Dict with provider, category, data, timestamp, success, error
"""
provider = "TheTie"
category = "sentiment"
endpoint = "/sentiment"
logger.info(f"Fetching sentiment from {provider} (placeholder)")
try:
# TheTie provides institutional-grade crypto market intelligence
# Requires API key
placeholder_data = {
"status": "placeholder",
"message": "TheTie API requires authentication",
"planned_metrics": [
"Twitter sentiment scores",
"Social media momentum",
"Influencer tracking",
"Sentiment trends over time"
]
}
return {
"provider": provider,
"category": category,
"data": placeholder_data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
except Exception as e:
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": False,
"error": str(e),
"error_type": "exception"
}
async def get_coinmarketcal_events() -> Dict[str, Any]:
"""
Fetch upcoming crypto events from CoinMarketCal (free API)
Returns:
Dict with provider, category, data, timestamp, success, error
"""
provider = "CoinMarketCal"
category = "sentiment"
endpoint = "/events"
logger.info(f"Fetching events from {provider}")
try:
client = get_client()
# CoinMarketCal API
url = "https://developers.coinmarketcal.com/v1/events"
params = {
"page": 1,
"max": 20,
"showOnly": "hot_events" # Only hot/important events
}
# Make request (may require API key for full access)
response = await client.get(url, params=params, timeout=10)
# Log request
log_api_request(
logger,
provider,
endpoint,
response.get("response_time_ms", 0),
"success" if response["success"] else "error",
response.get("status_code")
)
if not response["success"]:
# If API requires key, return placeholder
logger.warning(f"{provider} - API may require authentication, returning placeholder")
return {
"provider": provider,
"category": category,
"data": {
"status": "placeholder",
"message": "CoinMarketCal API may require authentication",
"planned_features": [
"Upcoming crypto events calendar",
"Project updates and announcements",
"Conferences and meetups",
"Hard forks and mainnet launches"
]
},
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
# Extract data
data = response["data"]
events_data = None
if isinstance(data, dict) and "body" in data:
events = data["body"]
events_data = {
"total_events": len(events) if isinstance(events, list) else 0,
"upcoming_events": [
{
"title": event.get("title", {}).get("en"),
"coins": [coin.get("symbol") for coin in event.get("coins", [])],
"date": event.get("date_event"),
"proof": event.get("proof"),
"source": event.get("source")
}
for event in (events[:10] if isinstance(events, list) else [])
]
}
logger.info(f"{provider} - {endpoint} - Retrieved {events_data.get('total_events', 0)} events")
return {
"provider": provider,
"category": category,
"data": events_data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"response_time_ms": response.get("response_time_ms", 0)
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"data": {
"status": "placeholder",
"message": f"CoinMarketCal integration error: {str(e)}"
},
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": True,
"error": None,
"is_placeholder": True
}
async def collect_extended_sentiment_data() -> List[Dict[str, Any]]:
"""
Main function to collect extended sentiment data from all sources
Returns:
List of results from all sentiment collectors
"""
logger.info("Starting extended sentiment data collection from all sources")
# Run all collectors concurrently
results = await asyncio.gather(
get_lunarcrush_global(),
get_santiment_metrics(),
get_cryptoquant_sentiment(),
get_augmento_signals(),
get_thetie_sentiment(),
get_coinmarketcal_events(),
return_exceptions=True
)
# Process results
processed_results = []
for result in results:
if isinstance(result, Exception):
logger.error(f"Collector failed with exception: {str(result)}")
processed_results.append({
"provider": "Unknown",
"category": "sentiment",
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"success": False,
"error": str(result),
"error_type": "exception"
})
else:
processed_results.append(result)
# Log summary
successful = sum(1 for r in processed_results if r.get("success", False))
placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False))
logger.info(
f"Extended sentiment collection complete: {successful}/{len(processed_results)} successful "
f"({placeholder_count} placeholders)"
)
return processed_results
# Example usage
if __name__ == "__main__":
async def main():
results = await collect_extended_sentiment_data()
print("\n=== Extended Sentiment Data Collection Results ===")
for result in results:
print(f"\nProvider: {result['provider']}")
print(f"Success: {result['success']}")
print(f"Is Placeholder: {result.get('is_placeholder', False)}")
if result['success']:
data = result.get('data', {})
if isinstance(data, dict):
if data.get('status') == 'placeholder':
print(f"Status: {data.get('message', 'N/A')}")
else:
print(f"Data keys: {list(data.keys())}")
else:
print(f"Error: {result.get('error', 'Unknown')}")
asyncio.run(main())