|
|
""" |
|
|
Extended Sentiment Collectors |
|
|
Fetches sentiment data from LunarCrush, Santiment, and other sentiment APIs |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timezone |
|
|
from typing import Dict, List, Optional, Any |
|
|
from utils.api_client import get_client |
|
|
from utils.logger import setup_logger, log_api_request, log_error |
|
|
|
|
|
logger = setup_logger("sentiment_extended_collector") |
|
|
|
|
|
|
|
|
async def get_lunarcrush_global() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch global market sentiment from LunarCrush |
|
|
|
|
|
Note: LunarCrush API v3 requires API key |
|
|
Free tier available with limited requests |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "LunarCrush" |
|
|
category = "sentiment" |
|
|
endpoint = "/public/metrics/global" |
|
|
|
|
|
logger.info(f"Fetching global sentiment from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://lunarcrush.com/api3/public/metrics/global" |
|
|
|
|
|
|
|
|
response = await client.get(url, timeout=10) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
|
|
|
logger.warning(f"{provider} - API requires authentication, returning placeholder") |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": { |
|
|
"status": "placeholder", |
|
|
"message": "LunarCrush API requires authentication", |
|
|
"planned_features": [ |
|
|
"Social media sentiment tracking", |
|
|
"Galaxy Score (social activity metric)", |
|
|
"AltRank (relative social dominance)", |
|
|
"Influencer tracking", |
|
|
"Social volume and engagement metrics" |
|
|
] |
|
|
}, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
|
|
|
data = response["data"] |
|
|
|
|
|
sentiment_data = None |
|
|
if isinstance(data, dict): |
|
|
sentiment_data = { |
|
|
"social_volume": data.get("social_volume"), |
|
|
"social_score": data.get("social_score"), |
|
|
"market_sentiment": data.get("sentiment"), |
|
|
"timestamp": data.get("timestamp") |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved sentiment data") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": sentiment_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": { |
|
|
"status": "placeholder", |
|
|
"message": f"LunarCrush integration error: {str(e)}" |
|
|
}, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
|
|
|
async def get_santiment_metrics() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch sentiment metrics from Santiment |
|
|
|
|
|
Note: Santiment API requires authentication |
|
|
Provides on-chain, social, and development activity metrics |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "Santiment" |
|
|
category = "sentiment" |
|
|
endpoint = "/graphql" |
|
|
|
|
|
logger.info(f"Fetching sentiment metrics from {provider} (placeholder)") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
placeholder_data = { |
|
|
"status": "placeholder", |
|
|
"message": "Santiment API requires authentication and GraphQL queries", |
|
|
"planned_metrics": [ |
|
|
"Social volume and trends", |
|
|
"Development activity", |
|
|
"Network growth", |
|
|
"Exchange flow", |
|
|
"MVRV ratio", |
|
|
"Daily active addresses", |
|
|
"Token age consumed", |
|
|
"Crowd sentiment" |
|
|
], |
|
|
"note": "Requires Santiment API key and SAN tokens for full access" |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Placeholder data returned") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": placeholder_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_cryptoquant_sentiment() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch on-chain sentiment from CryptoQuant |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "CryptoQuant" |
|
|
category = "sentiment" |
|
|
endpoint = "/sentiment" |
|
|
|
|
|
logger.info(f"Fetching sentiment from {provider} (placeholder)") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
placeholder_data = { |
|
|
"status": "placeholder", |
|
|
"message": "CryptoQuant API requires authentication", |
|
|
"planned_metrics": [ |
|
|
"Exchange reserves", |
|
|
"Miner flows", |
|
|
"Whale transactions", |
|
|
"Stablecoin supply ratio", |
|
|
"Funding rates", |
|
|
"Open interest" |
|
|
] |
|
|
} |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": placeholder_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_augmento_signals() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch market sentiment signals from Augmento.ai |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "Augmento" |
|
|
category = "sentiment" |
|
|
endpoint = "/signals" |
|
|
|
|
|
logger.info(f"Fetching sentiment signals from {provider} (placeholder)") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
placeholder_data = { |
|
|
"status": "placeholder", |
|
|
"message": "Augmento API requires authentication", |
|
|
"planned_features": [ |
|
|
"AI-powered sentiment signals", |
|
|
"Topic extraction from social media", |
|
|
"Emerging trend detection", |
|
|
"Sentiment momentum indicators" |
|
|
] |
|
|
} |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": placeholder_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_thetie_sentiment() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch sentiment data from TheTie.io |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "TheTie" |
|
|
category = "sentiment" |
|
|
endpoint = "/sentiment" |
|
|
|
|
|
logger.info(f"Fetching sentiment from {provider} (placeholder)") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
placeholder_data = { |
|
|
"status": "placeholder", |
|
|
"message": "TheTie API requires authentication", |
|
|
"planned_metrics": [ |
|
|
"Twitter sentiment scores", |
|
|
"Social media momentum", |
|
|
"Influencer tracking", |
|
|
"Sentiment trends over time" |
|
|
] |
|
|
} |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": placeholder_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_coinmarketcal_events() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch upcoming crypto events from CoinMarketCal (free API) |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "CoinMarketCal" |
|
|
category = "sentiment" |
|
|
endpoint = "/events" |
|
|
|
|
|
logger.info(f"Fetching events from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://developers.coinmarketcal.com/v1/events" |
|
|
|
|
|
params = { |
|
|
"page": 1, |
|
|
"max": 20, |
|
|
"showOnly": "hot_events" |
|
|
} |
|
|
|
|
|
|
|
|
response = await client.get(url, params=params, timeout=10) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
|
|
|
logger.warning(f"{provider} - API may require authentication, returning placeholder") |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": { |
|
|
"status": "placeholder", |
|
|
"message": "CoinMarketCal API may require authentication", |
|
|
"planned_features": [ |
|
|
"Upcoming crypto events calendar", |
|
|
"Project updates and announcements", |
|
|
"Conferences and meetups", |
|
|
"Hard forks and mainnet launches" |
|
|
] |
|
|
}, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
|
|
|
data = response["data"] |
|
|
|
|
|
events_data = None |
|
|
if isinstance(data, dict) and "body" in data: |
|
|
events = data["body"] |
|
|
|
|
|
events_data = { |
|
|
"total_events": len(events) if isinstance(events, list) else 0, |
|
|
"upcoming_events": [ |
|
|
{ |
|
|
"title": event.get("title", {}).get("en"), |
|
|
"coins": [coin.get("symbol") for coin in event.get("coins", [])], |
|
|
"date": event.get("date_event"), |
|
|
"proof": event.get("proof"), |
|
|
"source": event.get("source") |
|
|
} |
|
|
for event in (events[:10] if isinstance(events, list) else []) |
|
|
] |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved {events_data.get('total_events', 0)} events") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": events_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": { |
|
|
"status": "placeholder", |
|
|
"message": f"CoinMarketCal integration error: {str(e)}" |
|
|
}, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
|
|
|
async def collect_extended_sentiment_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main function to collect extended sentiment data from all sources |
|
|
|
|
|
Returns: |
|
|
List of results from all sentiment collectors |
|
|
""" |
|
|
logger.info("Starting extended sentiment data collection from all sources") |
|
|
|
|
|
|
|
|
results = await asyncio.gather( |
|
|
get_lunarcrush_global(), |
|
|
get_santiment_metrics(), |
|
|
get_cryptoquant_sentiment(), |
|
|
get_augmento_signals(), |
|
|
get_thetie_sentiment(), |
|
|
get_coinmarketcal_events(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
processed_results = [] |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Collector failed with exception: {str(result)}") |
|
|
processed_results.append({ |
|
|
"provider": "Unknown", |
|
|
"category": "sentiment", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
processed_results.append(result) |
|
|
|
|
|
|
|
|
successful = sum(1 for r in processed_results if r.get("success", False)) |
|
|
placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) |
|
|
|
|
|
logger.info( |
|
|
f"Extended sentiment collection complete: {successful}/{len(processed_results)} successful " |
|
|
f"({placeholder_count} placeholders)" |
|
|
) |
|
|
|
|
|
return processed_results |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
results = await collect_extended_sentiment_data() |
|
|
|
|
|
print("\n=== Extended Sentiment Data Collection Results ===") |
|
|
for result in results: |
|
|
print(f"\nProvider: {result['provider']}") |
|
|
print(f"Success: {result['success']}") |
|
|
print(f"Is Placeholder: {result.get('is_placeholder', False)}") |
|
|
|
|
|
if result['success']: |
|
|
data = result.get('data', {}) |
|
|
if isinstance(data, dict): |
|
|
if data.get('status') == 'placeholder': |
|
|
print(f"Status: {data.get('message', 'N/A')}") |
|
|
else: |
|
|
print(f"Data keys: {list(data.keys())}") |
|
|
else: |
|
|
print(f"Error: {result.get('error', 'Unknown')}") |
|
|
|
|
|
asyncio.run(main()) |
|
|
|