Enhanced Crypto Data Tracker - New Features
π Overview
This document describes the major enhancements added to the crypto data tracking system, including unified configuration management, advanced scheduling, real-time updates via WebSockets, and comprehensive data persistence.
β¨ New Features
1. Unified Configuration Loader
File: backend/services/unified_config_loader.py
The unified configuration loader automatically imports and manages all API sources from JSON configuration files at the project root.
Features:
- Loads from multiple JSON config files:
crypto_resources_unified_2025-11-11.json(200+ APIs)all_apis_merged_2025.jsonultimate_crypto_pipeline_2025_NZasinich.json
- Automatic API key extraction
- Category-based organization
- Update type classification (realtime, periodic, scheduled)
- Schedule management for each API
- Import/Export functionality
Usage:
from backend.services.unified_config_loader import UnifiedConfigLoader
loader = UnifiedConfigLoader()
# Get all APIs
all_apis = loader.get_all_apis()
# Get APIs by category
market_data_apis = loader.get_apis_by_category('market_data')
# Get APIs by update type
realtime_apis = loader.get_realtime_apis()
periodic_apis = loader.get_periodic_apis()
# Add custom API
loader.add_custom_api({
'id': 'custom_api',
'name': 'Custom API',
'category': 'custom',
'base_url': 'https://api.example.com',
'update_type': 'periodic',
'enabled': True
})
2. Enhanced Scheduling System
File: backend/services/scheduler_service.py
Advanced scheduler that manages periodic and real-time data updates with automatic error handling and retry logic.
Features:
- Periodic Updates: Schedule APIs to update at specific intervals
- Real-time Updates: WebSocket connections for instant data
- Scheduled Updates: Less frequent updates for HuggingFace and other resources
- Smart Retry: Automatic interval adjustment on failures
- Callbacks: Register callbacks for data updates
- Force Updates: Manually trigger immediate updates
Update Types:
realtime(0s interval): WebSocket - always connectedperiodic(60s interval): Regular polling for market datascheduled(3600s interval): Hourly updates for HF models/datasetsdaily(86400s interval): Once per day
Usage:
from backend.services.scheduler_service import SchedulerService
scheduler = SchedulerService(config_loader, db_manager)
# Start scheduler
await scheduler.start()
# Update schedule
scheduler.update_task_schedule('coingecko', interval=120, enabled=True)
# Force update
success = await scheduler.force_update('coingecko')
# Register callback
def on_data_update(api_id, data):
print(f"Data updated for {api_id}")
scheduler.register_callback('coingecko', on_data_update)
# Get task status
status = scheduler.get_task_status('coingecko')
# Export schedules
scheduler.export_schedules('schedules_backup.json')
3. Data Persistence Service
File: backend/services/persistence_service.py
Comprehensive data persistence with multiple export formats and automatic backups.
Features:
- In-memory caching for quick access
- Historical data tracking (configurable limit)
- Export to JSON, CSV formats
- Automatic backups
- Database integration (SQLAlchemy)
- Data cleanup utilities
Usage:
from backend.services.persistence_service import PersistenceService
persistence = PersistenceService(db_manager)
# Save data
await persistence.save_api_data(
'coingecko',
{'price': 50000},
metadata={'category': 'market_data'}
)
# Get cached data
data = persistence.get_cached_data('coingecko')
# Get history
history = persistence.get_history('coingecko', limit=100)
# Export to JSON
await persistence.export_to_json('export.json', include_history=True)
# Export to CSV
await persistence.export_to_csv('export.csv', flatten=True)
# Create backup
backup_file = await persistence.backup_all_data()
# Restore from backup
await persistence.restore_from_backup(backup_file)
# Cleanup old data (7 days)
removed = await persistence.cleanup_old_data(days=7)
4. Real-time WebSocket Service
File: backend/services/websocket_service.py
WebSocket service for real-time bidirectional communication between backend and frontend.
Features:
- Connection management with client tracking
- Subscription-based updates (specific APIs or all)
- Real-time notifications for:
- API data updates
- System status changes
- Schedule modifications
- Request-response patterns for data queries
- Heartbeat/ping-pong for connection health
WebSocket Message Types:
Client β Server:
subscribe: Subscribe to specific API updatessubscribe_all: Subscribe to all updatesunsubscribe: Unsubscribe from APIget_data: Request cached dataget_all_data: Request all cached dataget_schedule: Request schedule informationupdate_schedule: Update schedule configurationforce_update: Force immediate API updateping: Heartbeat
Server β Client:
connected: Welcome message with client IDapi_update: API data updatedstatus_update: System status changedschedule_update: Schedule modifiedsubscribed: Subscription confirmeddata_response: Data query responseschedule_response: Schedule query responsepong: Heartbeat responseerror: Error occurred
Usage:
Frontend JavaScript:
// Connect
const ws = new WebSocket('ws://localhost:8000/api/v2/ws');
// Subscribe to all updates
ws.send(JSON.stringify({ type: 'subscribe_all' }));
// Subscribe to specific API
ws.send(JSON.stringify({
type: 'subscribe',
api_id: 'coingecko'
}));
// Request data
ws.send(JSON.stringify({
type: 'get_data',
api_id: 'coingecko'
}));
// Update schedule
ws.send(JSON.stringify({
type: 'update_schedule',
api_id: 'coingecko',
interval: 120,
enabled: true
}));
// Force update
ws.send(JSON.stringify({
type: 'force_update',
api_id: 'coingecko'
}));
// Handle messages
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
switch (message.type) {
case 'api_update':
console.log(`${message.api_id} updated:`, message.data);
break;
case 'status_update':
console.log('Status:', message.status);
break;
}
};
5. Integrated Backend API
File: backend/routers/integrated_api.py
Comprehensive REST API that combines all services.
Endpoints:
Configuration:
GET /api/v2/config/apis- Get all configured APIsGET /api/v2/config/apis/{api_id}- Get specific APIGET /api/v2/config/categories- Get all categoriesGET /api/v2/config/apis/category/{category}- Get APIs by categoryPOST /api/v2/config/apis- Add custom APIDELETE /api/v2/config/apis/{api_id}- Remove APIGET /api/v2/config/export- Export configuration
Scheduling:
GET /api/v2/schedule/tasks- Get all scheduled tasksGET /api/v2/schedule/tasks/{api_id}- Get specific taskPUT /api/v2/schedule/tasks/{api_id}- Update schedulePOST /api/v2/schedule/tasks/{api_id}/force-update- Force updateGET /api/v2/schedule/export- Export schedules
Data:
GET /api/v2/data/cached- Get all cached dataGET /api/v2/data/cached/{api_id}- Get cached data for APIGET /api/v2/data/history/{api_id}- Get historical dataGET /api/v2/data/statistics- Get storage statistics
Export/Import:
POST /api/v2/export/json- Export to JSONPOST /api/v2/export/csv- Export to CSVPOST /api/v2/export/history/{api_id}- Export API historyGET /api/v2/download?file={path}- Download exported filePOST /api/v2/backup- Create backupPOST /api/v2/restore- Restore from backup
Status:
GET /api/v2/status- System statusGET /api/v2/health- Health check
Cleanup:
POST /api/v2/cleanup/cache- Clear cachePOST /api/v2/cleanup/history- Clear historyPOST /api/v2/cleanup/old-data- Remove old data
6. Enhanced Server
File: enhanced_server.py
Production-ready server with all services integrated.
Features:
- Automatic service initialization on startup
- Graceful shutdown with final backup
- Comprehensive logging
- CORS support
- Static file serving
- Multiple dashboard routes
Run the server:
python enhanced_server.py
Access points:
- Main Dashboard: http://localhost:8000/
- Enhanced Dashboard: http://localhost:8000/enhanced_dashboard.html
- API Documentation: http://localhost:8000/docs
- WebSocket: ws://localhost:8000/api/v2/ws
7. Enhanced Dashboard UI
File: enhanced_dashboard.html
Modern, interactive dashboard with real-time updates and full control over the system.
Features:
- Real-time Updates: WebSocket connection with live data
- Export Controls: One-click export to JSON/CSV
- Backup Management: Create/restore backups
- Schedule Configuration: Adjust update intervals per API
- Force Updates: Trigger immediate updates
- System Statistics: Live monitoring of system metrics
- Activity Log: Real-time activity feed
- API Management: View and control all API sources
π§ Installation & Setup
Prerequisites
pip install fastapi uvicorn websockets pandas httpx sqlalchemy
Directory Structure
crypto-dt-source/
βββ backend/
β βββ routers/
β β βββ integrated_api.py
β βββ services/
β βββ unified_config_loader.py
β βββ scheduler_service.py
β βββ persistence_service.py
β βββ websocket_service.py
βββ database/
β βββ models.py
β βββ db_manager.py
βββ data/
β βββ exports/
β βββ backups/
βββ crypto_resources_unified_2025-11-11.json
βββ all_apis_merged_2025.json
βββ ultimate_crypto_pipeline_2025_NZasinich.json
βββ enhanced_server.py
βββ enhanced_dashboard.html
Running the Enhanced Server
- Start the server:
python enhanced_server.py
Access the dashboard:
- Open browser to http://localhost:8000/enhanced_dashboard.html
Monitor logs:
- Server logs show all activities
- WebSocket connections
- Data updates
- Errors and warnings
π Configuration
Scheduling Configuration
Edit schedules via:
- Web UI: Click "Configure Schedule" in enhanced dashboard
- API: Use PUT /api/v2/schedule/tasks/{api_id}
- Code: Call
scheduler.update_task_schedule()
Update Types
Configure update_type in API configuration:
realtime: WebSocket connection (instant updates)periodic: Regular polling (default: 60s)scheduled: Less frequent updates (default: 3600s)daily: Once per day (default: 86400s)
Data Retention
Configure in persistence_service.py:
max_history_per_api = 1000 # Keep last 1000 records per API
Cleanup old data:
curl -X POST http://localhost:8000/api/v2/cleanup/old-data?days=7
π Security Notes
- API keys are stored securely in config files
- Keys are masked in exports (shown as ***)
- Database uses SQLite with proper permissions
- CORS configured for security
- WebSocket connections tracked and managed
π Performance
- In-memory caching: Fast data access
- Async operations: Non-blocking I/O
- Concurrent updates: Parallel API calls
- Connection pooling: Efficient database access
- Smart retry logic: Automatic error recovery
π Examples
Example 1: Setup and Start
from backend.services.unified_config_loader import UnifiedConfigLoader
from backend.services.scheduler_service import SchedulerService
from backend.services.persistence_service import PersistenceService
# Initialize
config = UnifiedConfigLoader()
persistence = PersistenceService()
scheduler = SchedulerService(config)
# Start scheduler
await scheduler.start()
Example 2: Export Data
# Export all data to JSON
await persistence.export_to_json('all_data.json', include_history=True)
# Export specific APIs to CSV
await persistence.export_to_csv('market_data.csv', api_ids=['coingecko', 'binance'])
Example 3: Custom API
# Add custom API
config.add_custom_api({
'id': 'my_custom_api',
'name': 'My Custom API',
'category': 'custom',
'base_url': 'https://api.myservice.com/data',
'auth': {'type': 'apiKey', 'key': 'YOUR_KEY'},
'update_type': 'periodic',
'interval': 300
})
π Troubleshooting
WebSocket Not Connecting
- Check server is running
- Verify URL:
ws://localhost:8000/api/v2/ws - Check browser console for errors
- Ensure no firewall blocking WebSocket
Data Not Updating
- Check scheduler is running: GET /api/v2/status
- Verify API is enabled in schedule
- Check logs for errors
- Force update: POST /api/v2/schedule/tasks/{api_id}/force-update
Export Fails
- Ensure
data/exports/directory exists - Check disk space
- Verify pandas is installed
π API Documentation
Full API documentation available at: http://localhost:8000/docs
π Credits
Enhanced features developed for comprehensive crypto data tracking with real-time updates, advanced scheduling, and data persistence.