""" News Data Collectors Fetches cryptocurrency news from CryptoPanic and NewsAPI """ import asyncio from datetime import datetime, timezone from typing import Dict, List, Optional, Any from utils.api_client import get_client from utils.logger import setup_logger, log_api_request, log_error from config import config logger = setup_logger("news_collector") def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: """ Calculate staleness in minutes from data timestamp to now Args: data_timestamp: Timestamp of the data Returns: Staleness in minutes or None if timestamp not available """ if not data_timestamp: return None now = datetime.now(timezone.utc) if data_timestamp.tzinfo is None: data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) delta = now - data_timestamp return delta.total_seconds() / 60.0 def parse_iso_timestamp(timestamp_str: str) -> Optional[datetime]: """ Parse ISO timestamp string to datetime Args: timestamp_str: ISO format timestamp string Returns: datetime object or None if parsing fails """ try: # Handle various ISO formats if timestamp_str.endswith('Z'): timestamp_str = timestamp_str.replace('Z', '+00:00') return datetime.fromisoformat(timestamp_str) except: return None async def get_cryptopanic_posts() -> Dict[str, Any]: """ Fetch latest cryptocurrency news posts from CryptoPanic Returns: Dict with provider, category, data, timestamp, staleness, success, error """ provider = "CryptoPanic" category = "news" endpoint = "/posts/" logger.info(f"Fetching posts from {provider}") try: client = get_client() provider_config = config.get_provider(provider) if not provider_config: error_msg = f"Provider {provider} not configured" log_error(logger, provider, "config_error", error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg } # Build request URL url = f"{provider_config.endpoint_url}{endpoint}" params = { "auth_token": "free", # CryptoPanic offers free tier "public": "true", "kind": "news", # Get news posts "filter": "rising" # Get rising news } # Make request response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000) # Log request log_api_request( logger, provider, endpoint, response.get("response_time_ms", 0), "success" if response["success"] else "error", response.get("status_code") ) if not response["success"]: error_msg = response.get("error_message", "Unknown error") log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": response.get("error_type") } # Extract data data = response["data"] # Parse timestamp from most recent post data_timestamp = None if isinstance(data, dict) and "results" in data: results = data["results"] if isinstance(results, list) and len(results) > 0: # Get the most recent post's timestamp first_post = results[0] if isinstance(first_post, dict) and "created_at" in first_post: data_timestamp = parse_iso_timestamp(first_post["created_at"]) staleness = calculate_staleness_minutes(data_timestamp) # Count posts post_count = 0 if isinstance(data, dict) and "results" in data: post_count = len(data["results"]) logger.info( f"{provider} - {endpoint} - Retrieved {post_count} posts, " f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" ) return { "provider": provider, "category": category, "data": data, "timestamp": datetime.now(timezone.utc).isoformat(), "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, "staleness_minutes": staleness, "success": True, "error": None, "response_time_ms": response.get("response_time_ms", 0), "post_count": post_count } except Exception as e: error_msg = f"Unexpected error: {str(e)}" log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": "exception" } async def get_newsapi_headlines() -> Dict[str, Any]: """ Fetch cryptocurrency headlines from NewsAPI (newsdata.io) Returns: Dict with provider, category, data, timestamp, staleness, success, error """ provider = "NewsAPI" category = "news" endpoint = "/news" logger.info(f"Fetching headlines from {provider}") try: client = get_client() provider_config = config.get_provider(provider) if not provider_config: error_msg = f"Provider {provider} not configured" log_error(logger, provider, "config_error", error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg } # Check if API key is available if provider_config.requires_key and not provider_config.api_key: error_msg = f"API key required but not configured for {provider}" log_error(logger, provider, "auth_error", error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": "missing_api_key" } # Build request URL url = f"{provider_config.endpoint_url}{endpoint}" params = { "apikey": provider_config.api_key, "q": "cryptocurrency OR bitcoin OR ethereum", "language": "en", "category": "business,technology" } # Make request response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000) # Log request log_api_request( logger, provider, endpoint, response.get("response_time_ms", 0), "success" if response["success"] else "error", response.get("status_code") ) if not response["success"]: error_msg = response.get("error_message", "Unknown error") log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": response.get("error_type") } # Extract data data = response["data"] # Parse timestamp from most recent article data_timestamp = None if isinstance(data, dict) and "results" in data: results = data["results"] if isinstance(results, list) and len(results) > 0: # Get the most recent article's timestamp first_article = results[0] if isinstance(first_article, dict): # Try different timestamp fields timestamp_field = first_article.get("pubDate") or first_article.get("publishedAt") if timestamp_field: data_timestamp = parse_iso_timestamp(timestamp_field) staleness = calculate_staleness_minutes(data_timestamp) # Count articles article_count = 0 if isinstance(data, dict) and "results" in data: article_count = len(data["results"]) logger.info( f"{provider} - {endpoint} - Retrieved {article_count} articles, " f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" ) return { "provider": provider, "category": category, "data": data, "timestamp": datetime.now(timezone.utc).isoformat(), "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, "staleness_minutes": staleness, "success": True, "error": None, "response_time_ms": response.get("response_time_ms", 0), "article_count": article_count } except Exception as e: error_msg = f"Unexpected error: {str(e)}" log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": "exception" } async def collect_news_data() -> List[Dict[str, Any]]: """ Main function to collect news data from all sources Returns: List of results from all news collectors """ logger.info("Starting news data collection from all sources") # Run all collectors concurrently results = await asyncio.gather( get_cryptopanic_posts(), get_newsapi_headlines(), return_exceptions=True ) # Process results processed_results = [] for result in results: if isinstance(result, Exception): logger.error(f"Collector failed with exception: {str(result)}") processed_results.append({ "provider": "Unknown", "category": "news", "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": str(result), "error_type": "exception" }) else: processed_results.append(result) # Log summary successful = sum(1 for r in processed_results if r.get("success", False)) total_items = sum( r.get("post_count", 0) + r.get("article_count", 0) for r in processed_results if r.get("success", False) ) logger.info( f"News data collection complete: {successful}/{len(processed_results)} successful, " f"{total_items} total items" ) return processed_results # Alias for backward compatibility collect_news = collect_news_data class NewsCollector: """ News Collector class for WebSocket streaming interface Wraps the standalone news collection functions """ def __init__(self, config: Any = None): """ Initialize the news collector Args: config: Configuration object (optional, for compatibility) """ self.config = config self.logger = logger async def collect(self) -> Dict[str, Any]: """ Collect news data from all sources Returns: Dict with aggregated news data """ results = await collect_news_data() # Aggregate data for WebSocket streaming aggregated = { "articles": [], "sources": [], "categories": [], "breaking": [], "timestamp": datetime.now(timezone.utc).isoformat() } for result in results: if result.get("success") and result.get("data"): provider = result.get("provider", "unknown") aggregated["sources"].append(provider) data = result["data"] # Parse CryptoPanic posts if provider == "CryptoPanic" and "results" in data: for post in data["results"][:10]: # Take top 10 aggregated["articles"].append({ "title": post.get("title"), "url": post.get("url"), "source": post.get("source", {}).get("title"), "published_at": post.get("published_at"), "kind": post.get("kind"), "votes": post.get("votes", {}) }) # Parse NewsAPI articles elif provider == "NewsAPI" and "articles" in data: for article in data["articles"][:10]: # Take top 10 aggregated["articles"].append({ "title": article.get("title"), "url": article.get("url"), "source": article.get("source", {}).get("name"), "published_at": article.get("publishedAt"), "description": article.get("description") }) return aggregated # Example usage if __name__ == "__main__": async def main(): results = await collect_news_data() print("\n=== News Data Collection Results ===") for result in results: print(f"\nProvider: {result['provider']}") print(f"Success: {result['success']}") print(f"Staleness: {result.get('staleness_minutes', 'N/A')} minutes") if result['success']: print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") print(f"Items: {result.get('post_count', 0) + result.get('article_count', 0)}") else: print(f"Error: {result.get('error', 'Unknown')}") asyncio.run(main())