File size: 4,031 Bytes
d6d843f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
"""
Integration Test Script
Tests all critical integrations for the Crypto Hub system
"""
from datetime import datetime
from database.db_manager import db_manager
print("=" * 80)
print("CRYPTO HUB - INTEGRATION TEST")
print("=" * 80)
print()
# Test 1: Database Manager with Data Access
print("TEST 1: Database Manager with Data Access Layer")
print("-" * 80)
# Initialize database
db_manager.init_database()
print("β Database initialized")
# Test save market price
price = db_manager.save_market_price(
symbol="BTC",
price_usd=45000.00,
market_cap=880000000000,
volume_24h=28500000000,
price_change_24h=2.5,
source="Test"
)
print(f"β Saved market price: BTC = ${price.price_usd}")
# Test retrieve market price
latest_price = db_manager.get_latest_price_by_symbol("BTC")
print(f"β Retrieved market price: BTC = ${latest_price.price_usd}")
# Test save news article
news = db_manager.save_news_article(
title="Bitcoin reaches new milestone",
content="Bitcoin price surges past $45,000",
source="Test",
published_at=datetime.utcnow(),
sentiment="positive"
)
print(f"β Saved news article: ID={news.id}")
# Test retrieve news
latest_news = db_manager.get_latest_news(limit=5)
print(f"β Retrieved {len(latest_news)} news articles")
# Test save sentiment
sentiment = db_manager.save_sentiment_metric(
metric_name="fear_greed_index",
value=65.0,
classification="greed",
source="Test"
)
print(f"β Saved sentiment metric: {sentiment.value}")
# Test retrieve sentiment
latest_sentiment = db_manager.get_latest_sentiment()
if latest_sentiment:
print(f"β Retrieved sentiment: {latest_sentiment.value} ({latest_sentiment.classification})")
print()
# Test 2: Database Statistics
print("TEST 2: Database Statistics")
print("-" * 80)
stats = db_manager.get_database_stats()
print(f"β Database size: {stats.get('database_size_mb', 0)} MB")
print(f"β Market prices: {stats.get('market_prices', 0)} records")
print(f"β News articles: {stats.get('news_articles', 0)} records")
print(f"β Sentiment metrics: {stats.get('sentiment_metrics', 0)} records")
print()
# Test 3: Data Endpoints Import
print("TEST 3: Data Endpoints")
print("-" * 80)
try:
from api.data_endpoints import router
print(f"β Data endpoints router imported")
print(f"β Router prefix: {router.prefix}")
print(f"β Router tags: {router.tags}")
except Exception as e:
print(f"β Error importing data endpoints: {e}")
print()
# Test 4: Data Persistence
print("TEST 4: Data Persistence Module")
print("-" * 80)
try:
from collectors.data_persistence import data_persistence
print(f"β Data persistence module imported")
# Create mock data
mock_market_data = [
{
'success': True,
'provider': 'CoinGecko',
'data': {
'bitcoin': {
'usd': 46000.00,
'usd_market_cap': 900000000000,
'usd_24h_vol': 30000000000,
'usd_24h_change': 3.2
}
}
}
]
count = data_persistence.save_market_data(mock_market_data)
print(f"β Saved {count} market prices via persistence layer")
except Exception as e:
print(f"β Error in data persistence: {e}")
print()
# Test 5: WebSocket Broadcaster
print("TEST 5: WebSocket Broadcaster")
print("-" * 80)
try:
from api.ws_data_broadcaster import broadcaster
print(f"β WebSocket broadcaster imported")
print(f"β Broadcaster initialized: {broadcaster is not None}")
except Exception as e:
print(f"β Error importing broadcaster: {e}")
print()
# Test 6: Health Check
print("TEST 6: System Health Check")
print("-" * 80)
health = db_manager.health_check()
print(f"β Database status: {health.get('status', 'unknown')}")
print(f"β Database path: {health.get('database_path', 'N/A')}")
print()
print("=" * 80)
print("INTEGRATION TEST COMPLETE")
print("All critical integrations are working!")
print("=" * 80)
|