|
|
""" |
|
|
Extended News Collectors |
|
|
Fetches news from RSS feeds, CoinDesk, CoinTelegraph, and other crypto news sources |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import feedparser |
|
|
from datetime import datetime, timezone |
|
|
from typing import Dict, List, Optional, Any |
|
|
from utils.api_client import get_client |
|
|
from utils.logger import setup_logger, log_api_request, log_error |
|
|
|
|
|
logger = setup_logger("news_extended_collector") |
|
|
|
|
|
|
|
|
async def get_rss_feed(provider: str, feed_url: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch and parse RSS feed from a news source |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
feed_url: RSS feed URL |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
category = "news" |
|
|
endpoint = "/rss" |
|
|
|
|
|
logger.info(f"Fetching RSS feed from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
response = await client.get(feed_url, timeout=15) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
raw_data = response.get("raw_content", "") |
|
|
if not raw_data: |
|
|
raw_data = str(response.get("data", "")) |
|
|
|
|
|
|
|
|
feed = feedparser.parse(raw_data) |
|
|
|
|
|
news_data = None |
|
|
if feed and hasattr(feed, 'entries'): |
|
|
entries = feed.entries[:10] |
|
|
|
|
|
articles = [] |
|
|
for entry in entries: |
|
|
article = { |
|
|
"title": entry.get("title", ""), |
|
|
"link": entry.get("link", ""), |
|
|
"published": entry.get("published", ""), |
|
|
"summary": entry.get("summary", "")[:200] if "summary" in entry else None |
|
|
} |
|
|
articles.append(article) |
|
|
|
|
|
news_data = { |
|
|
"feed_title": feed.feed.get("title", provider) if hasattr(feed, 'feed') else provider, |
|
|
"total_entries": len(feed.entries), |
|
|
"articles": articles |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved {len(feed.entries) if feed else 0} articles") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": news_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_coindesk_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from CoinDesk RSS feed |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("CoinDesk", "https://www.coindesk.com/arc/outboundfeeds/rss/") |
|
|
|
|
|
|
|
|
async def get_cointelegraph_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from CoinTelegraph RSS feed |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("CoinTelegraph", "https://cointelegraph.com/rss") |
|
|
|
|
|
|
|
|
async def get_decrypt_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from Decrypt RSS feed |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("Decrypt", "https://decrypt.co/feed") |
|
|
|
|
|
|
|
|
async def get_bitcoinmagazine_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from Bitcoin Magazine RSS feed |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("BitcoinMagazine", "https://bitcoinmagazine.com/.rss/full/") |
|
|
|
|
|
|
|
|
async def get_theblock_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from The Block |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("TheBlock", "https://www.theblock.co/rss.xml") |
|
|
|
|
|
|
|
|
async def get_cryptoslate_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from CryptoSlate |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "CryptoSlate" |
|
|
category = "news" |
|
|
endpoint = "/newslist" |
|
|
|
|
|
logger.info(f"Fetching news from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://cryptoslate.com/wp-json/cs/v1/posts" |
|
|
|
|
|
params = { |
|
|
"per_page": 10, |
|
|
"orderby": "date" |
|
|
} |
|
|
|
|
|
|
|
|
response = await client.get(url, params=params, timeout=10) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
|
|
|
logger.info(f"{provider} - API failed, trying RSS feed") |
|
|
return await get_rss_feed(provider, "https://cryptoslate.com/feed/") |
|
|
|
|
|
|
|
|
data = response["data"] |
|
|
|
|
|
news_data = None |
|
|
if isinstance(data, list): |
|
|
articles = [ |
|
|
{ |
|
|
"title": article.get("title", {}).get("rendered", ""), |
|
|
"link": article.get("link", ""), |
|
|
"published": article.get("date", ""), |
|
|
"excerpt": article.get("excerpt", {}).get("rendered", "")[:200] |
|
|
} |
|
|
for article in data |
|
|
] |
|
|
|
|
|
news_data = { |
|
|
"total_entries": len(articles), |
|
|
"articles": articles |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved {len(data) if isinstance(data, list) else 0} articles") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": news_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
logger.info(f"{provider} - Exception occurred, trying RSS feed") |
|
|
return await get_rss_feed(provider, "https://cryptoslate.com/feed/") |
|
|
|
|
|
|
|
|
async def get_cryptonews_feed() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from Crypto.news RSS feed |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("CryptoNews", "https://crypto.news/feed/") |
|
|
|
|
|
|
|
|
async def get_coinjournal_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from CoinJournal RSS feed |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("CoinJournal", "https://coinjournal.net/feed/") |
|
|
|
|
|
|
|
|
async def get_beincrypto_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from BeInCrypto RSS feed |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("BeInCrypto", "https://beincrypto.com/feed/") |
|
|
|
|
|
|
|
|
async def get_cryptobriefing_news() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch news from CryptoBriefing |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
return await get_rss_feed("CryptoBriefing", "https://cryptobriefing.com/feed/") |
|
|
|
|
|
|
|
|
async def collect_extended_news() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main function to collect news from all extended sources |
|
|
|
|
|
Returns: |
|
|
List of results from all news collectors |
|
|
""" |
|
|
logger.info("Starting extended news collection from all sources") |
|
|
|
|
|
|
|
|
results = await asyncio.gather( |
|
|
get_coindesk_news(), |
|
|
get_cointelegraph_news(), |
|
|
get_decrypt_news(), |
|
|
get_bitcoinmagazine_news(), |
|
|
get_theblock_news(), |
|
|
get_cryptoslate_news(), |
|
|
get_cryptonews_feed(), |
|
|
get_coinjournal_news(), |
|
|
get_beincrypto_news(), |
|
|
get_cryptobriefing_news(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
processed_results = [] |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Collector failed with exception: {str(result)}") |
|
|
processed_results.append({ |
|
|
"provider": "Unknown", |
|
|
"category": "news", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
processed_results.append(result) |
|
|
|
|
|
|
|
|
successful = sum(1 for r in processed_results if r.get("success", False)) |
|
|
total_articles = sum( |
|
|
r.get("data", {}).get("total_entries", 0) |
|
|
for r in processed_results |
|
|
if r.get("success", False) and r.get("data") |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"Extended news collection complete: {successful}/{len(processed_results)} sources successful, " |
|
|
f"{total_articles} total articles" |
|
|
) |
|
|
|
|
|
return processed_results |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
results = await collect_extended_news() |
|
|
|
|
|
print("\n=== Extended News Collection Results ===") |
|
|
for result in results: |
|
|
print(f"\nProvider: {result['provider']}") |
|
|
print(f"Success: {result['success']}") |
|
|
|
|
|
if result['success']: |
|
|
data = result.get('data', {}) |
|
|
if data: |
|
|
print(f"Total Articles: {data.get('total_entries', 'N/A')}") |
|
|
articles = data.get('articles', []) |
|
|
if articles: |
|
|
print(f"Latest: {articles[0].get('title', 'N/A')[:60]}...") |
|
|
else: |
|
|
print(f"Error: {result.get('error', 'Unknown')}") |
|
|
|
|
|
asyncio.run(main()) |
|
|
|