""" Data Access API Endpoints Provides user-facing endpoints to access collected cryptocurrency data """ from datetime import datetime, timedelta from typing import Optional, List from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from database.db_manager import db_manager from utils.logger import setup_logger logger = setup_logger("data_endpoints") router = APIRouter(prefix="/api/crypto", tags=["data"]) # ============================================================================ # Pydantic Models # ============================================================================ class PriceData(BaseModel): """Price data model""" symbol: str price_usd: float market_cap: Optional[float] = None volume_24h: Optional[float] = None price_change_24h: Optional[float] = None timestamp: datetime source: str class NewsArticle(BaseModel): """News article model""" id: int title: str content: Optional[str] = None source: str url: Optional[str] = None published_at: datetime sentiment: Optional[str] = None tags: Optional[List[str]] = None class WhaleTransaction(BaseModel): """Whale transaction model""" id: int blockchain: str transaction_hash: str from_address: str to_address: str amount: float amount_usd: float timestamp: datetime source: str class SentimentMetric(BaseModel): """Sentiment metric model""" metric_name: str value: float classification: str timestamp: datetime source: str # ============================================================================ # Market Data Endpoints # ============================================================================ @router.get("/prices", response_model=List[PriceData]) async def get_all_prices( limit: int = Query(default=100, ge=1, le=1000, description="Number of records to return") ): """ Get latest prices for all cryptocurrencies Returns the most recent price data for all tracked cryptocurrencies """ try: prices = db_manager.get_latest_prices(limit=limit) if not prices: return [] return [ PriceData( symbol=p.symbol, price_usd=p.price_usd, market_cap=p.market_cap, volume_24h=p.volume_24h, price_change_24h=p.price_change_24h, timestamp=p.timestamp, source=p.source ) for p in prices ] except Exception as e: logger.error(f"Error getting prices: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get prices: {str(e)}") @router.get("/prices/{symbol}", response_model=PriceData) async def get_price_by_symbol(symbol: str): """ Get latest price for a specific cryptocurrency Args: symbol: Cryptocurrency symbol (e.g., BTC, ETH, BNB) """ try: symbol = symbol.upper() price = db_manager.get_latest_price_by_symbol(symbol) if not price: raise HTTPException(status_code=404, detail=f"Price data not found for {symbol}") return PriceData( symbol=price.symbol, price_usd=price.price_usd, market_cap=price.market_cap, volume_24h=price.volume_24h, price_change_24h=price.price_change_24h, timestamp=price.timestamp, source=price.source ) except HTTPException: raise except Exception as e: logger.error(f"Error getting price for {symbol}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get price: {str(e)}") @router.get("/history/{symbol}") async def get_price_history( symbol: str, hours: int = Query(default=24, ge=1, le=720, description="Number of hours of history"), interval: int = Query(default=60, ge=1, le=1440, description="Interval in minutes") ): """ Get price history for a cryptocurrency Args: symbol: Cryptocurrency symbol hours: Number of hours of history to return interval: Data point interval in minutes """ try: symbol = symbol.upper() history = db_manager.get_price_history(symbol, hours=hours) if not history: raise HTTPException(status_code=404, detail=f"No history found for {symbol}") # Sample data based on interval sampled = [] last_time = None for record in history: if last_time is None or (record.timestamp - last_time).total_seconds() >= interval * 60: sampled.append({ "timestamp": record.timestamp.isoformat(), "price_usd": record.price_usd, "volume_24h": record.volume_24h, "market_cap": record.market_cap }) last_time = record.timestamp return { "symbol": symbol, "data_points": len(sampled), "interval_minutes": interval, "history": sampled } except HTTPException: raise except Exception as e: logger.error(f"Error getting history for {symbol}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get history: {str(e)}") @router.get("/market-overview") async def get_market_overview(): """ Get market overview with top cryptocurrencies """ try: prices = db_manager.get_latest_prices(limit=20) if not prices: return { "total_market_cap": 0, "total_volume_24h": 0, "top_gainers": [], "top_losers": [], "top_by_market_cap": [] } # Calculate totals total_market_cap = sum(p.market_cap for p in prices if p.market_cap) total_volume_24h = sum(p.volume_24h for p in prices if p.volume_24h) # Sort by price change sorted_by_change = sorted( [p for p in prices if p.price_change_24h is not None], key=lambda x: x.price_change_24h, reverse=True ) # Sort by market cap sorted_by_mcap = sorted( [p for p in prices if p.market_cap is not None], key=lambda x: x.market_cap, reverse=True ) return { "total_market_cap": total_market_cap, "total_volume_24h": total_volume_24h, "top_gainers": [ { "symbol": p.symbol, "price_usd": p.price_usd, "price_change_24h": p.price_change_24h } for p in sorted_by_change[:5] ], "top_losers": [ { "symbol": p.symbol, "price_usd": p.price_usd, "price_change_24h": p.price_change_24h } for p in sorted_by_change[-5:] ], "top_by_market_cap": [ { "symbol": p.symbol, "price_usd": p.price_usd, "market_cap": p.market_cap, "volume_24h": p.volume_24h } for p in sorted_by_mcap[:10] ], "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error getting market overview: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get market overview: {str(e)}") # ============================================================================ # News Endpoints # ============================================================================ @router.get("/news", response_model=List[NewsArticle]) async def get_latest_news( limit: int = Query(default=50, ge=1, le=200, description="Number of articles"), source: Optional[str] = Query(default=None, description="Filter by source"), sentiment: Optional[str] = Query(default=None, description="Filter by sentiment") ): """ Get latest cryptocurrency news Args: limit: Maximum number of articles to return source: Filter by news source sentiment: Filter by sentiment (positive, negative, neutral) """ try: news = db_manager.get_latest_news( limit=limit, source=source, sentiment=sentiment ) if not news: return [] return [ NewsArticle( id=article.id, title=article.title, content=article.content, source=article.source, url=article.url, published_at=article.published_at, sentiment=article.sentiment, tags=article.tags.split(',') if article.tags else None ) for article in news ] except Exception as e: logger.error(f"Error getting news: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get news: {str(e)}") @router.get("/news/{news_id}", response_model=NewsArticle) async def get_news_by_id(news_id: int): """ Get a specific news article by ID """ try: article = db_manager.get_news_by_id(news_id) if not article: raise HTTPException(status_code=404, detail=f"News article {news_id} not found") return NewsArticle( id=article.id, title=article.title, content=article.content, source=article.source, url=article.url, published_at=article.published_at, sentiment=article.sentiment, tags=article.tags.split(',') if article.tags else None ) except HTTPException: raise except Exception as e: logger.error(f"Error getting news {news_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get news: {str(e)}") @router.get("/news/search") async def search_news( q: str = Query(..., min_length=2, description="Search query"), limit: int = Query(default=50, ge=1, le=200) ): """ Search news articles by keyword Args: q: Search query limit: Maximum number of results """ try: results = db_manager.search_news(query=q, limit=limit) return { "query": q, "count": len(results), "results": [ { "id": article.id, "title": article.title, "source": article.source, "url": article.url, "published_at": article.published_at.isoformat(), "sentiment": article.sentiment } for article in results ] } except Exception as e: logger.error(f"Error searching news: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to search news: {str(e)}") # ============================================================================ # Sentiment Endpoints # ============================================================================ @router.get("/sentiment/current") async def get_current_sentiment(): """ Get current market sentiment metrics """ try: sentiment = db_manager.get_latest_sentiment() if not sentiment: return { "fear_greed_index": None, "classification": "unknown", "timestamp": None, "message": "No sentiment data available" } return { "fear_greed_index": sentiment.value, "classification": sentiment.classification, "timestamp": sentiment.timestamp.isoformat(), "source": sentiment.source, "description": _get_sentiment_description(sentiment.classification) } except Exception as e: logger.error(f"Error getting sentiment: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get sentiment: {str(e)}") @router.get("/sentiment/history") async def get_sentiment_history( hours: int = Query(default=168, ge=1, le=720, description="Hours of history (default: 7 days)") ): """ Get sentiment history """ try: history = db_manager.get_sentiment_history(hours=hours) return { "data_points": len(history), "history": [ { "timestamp": record.timestamp.isoformat(), "value": record.value, "classification": record.classification } for record in history ] } except Exception as e: logger.error(f"Error getting sentiment history: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get sentiment history: {str(e)}") # ============================================================================ # Whale Tracking Endpoints # ============================================================================ @router.get("/whales/transactions", response_model=List[WhaleTransaction]) async def get_whale_transactions( limit: int = Query(default=50, ge=1, le=200), blockchain: Optional[str] = Query(default=None, description="Filter by blockchain"), min_amount_usd: Optional[float] = Query(default=None, ge=0, description="Minimum transaction amount in USD") ): """ Get recent large cryptocurrency transactions (whale movements) Args: limit: Maximum number of transactions blockchain: Filter by blockchain (ethereum, bitcoin, etc.) min_amount_usd: Minimum transaction amount in USD """ try: transactions = db_manager.get_whale_transactions( limit=limit, blockchain=blockchain, min_amount_usd=min_amount_usd ) if not transactions: return [] return [ WhaleTransaction( id=tx.id, blockchain=tx.blockchain, transaction_hash=tx.transaction_hash, from_address=tx.from_address, to_address=tx.to_address, amount=tx.amount, amount_usd=tx.amount_usd, timestamp=tx.timestamp, source=tx.source ) for tx in transactions ] except Exception as e: logger.error(f"Error getting whale transactions: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get whale transactions: {str(e)}") @router.get("/whales/stats") async def get_whale_stats( hours: int = Query(default=24, ge=1, le=168, description="Time period in hours") ): """ Get whale activity statistics """ try: stats = db_manager.get_whale_stats(hours=hours) return { "period_hours": hours, "total_transactions": stats.get('total_transactions', 0), "total_volume_usd": stats.get('total_volume_usd', 0), "avg_transaction_usd": stats.get('avg_transaction_usd', 0), "largest_transaction_usd": stats.get('largest_transaction_usd', 0), "by_blockchain": stats.get('by_blockchain', {}), "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error getting whale stats: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get whale stats: {str(e)}") # ============================================================================ # Blockchain Data Endpoints # ============================================================================ @router.get("/blockchain/gas") async def get_gas_prices(): """ Get current gas prices for various blockchains """ try: gas_prices = db_manager.get_latest_gas_prices() return { "ethereum": gas_prices.get('ethereum', {}), "bsc": gas_prices.get('bsc', {}), "polygon": gas_prices.get('polygon', {}), "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error getting gas prices: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get gas prices: {str(e)}") @router.get("/blockchain/stats") async def get_blockchain_stats(): """ Get blockchain statistics """ try: stats = db_manager.get_blockchain_stats() return { "ethereum": stats.get('ethereum', {}), "bitcoin": stats.get('bitcoin', {}), "bsc": stats.get('bsc', {}), "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error getting blockchain stats: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to get blockchain stats: {str(e)}") # ============================================================================ # Helper Functions # ============================================================================ def _get_sentiment_description(classification: str) -> str: """Get human-readable description for sentiment classification""" descriptions = { "extreme_fear": "Extreme Fear - Investors are very worried", "fear": "Fear - Investors are concerned", "neutral": "Neutral - Market is balanced", "greed": "Greed - Investors are getting greedy", "extreme_greed": "Extreme Greed - Market may be overheated" } return descriptions.get(classification, "Unknown sentiment")