import os import json from typing import List, Dict, Any, Union import pandas as pd def save_market_data(data: List[List[Union[int, float]]], file_path: str) -> bool: """ Save market data to a JSON file Args: data: List of OHLCV candles file_path: Path to save the data Returns: bool: True if saved successfully """ try: # Create directory if it doesn't exist os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w') as f: json.dump(data, f) return True except Exception as e: print(f"Error saving market data: {str(e)}") return False def load_market_data(file_path: str) -> List[List[Union[int, float]]]: """ Load market data from a JSON file Args: file_path: Path to the data file Returns: List of OHLCV candles """ try: with open(file_path, 'r') as f: data = json.load(f) return data except Exception as e: print(f"Error loading market data: {str(e)}") return [] def get_available_data(directory: str) -> List[Dict[str, Any]]: """ Get list of available data files Args: directory: Directory to scan Returns: List of data file information """ result = [] try: # Create directory if it doesn't exist os.makedirs(directory, exist_ok=True) # List files in directory files = os.listdir(directory) for file in files: if file.endswith('.json'): # Parse file name to extract information parts = file.replace('.json', '').split('_') if len(parts) >= 3: exchange_id = parts[0] # Reconstruct symbol (may contain underscores) timeframe_index = None for i, part in enumerate(parts[2:], start=2): if part in ['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w', '1M']: timeframe_index = i break if timeframe_index is not None: symbol_parts = parts[1:timeframe_index] symbol = '/'.join(symbol_parts) if len(symbol_parts) > 1 else symbol_parts[0] timeframe = parts[timeframe_index] # Get file stats file_path = os.path.join(directory, file) file_size = os.path.getsize(file_path) modified_time = os.path.getmtime(file_path) # Get candle count try: data = load_market_data(file_path) candle_count = len(data) except: candle_count = 0 result.append({ 'exchange': exchange_id, 'symbol': symbol, 'timeframe': timeframe, 'filename': file, 'size_kb': round(file_size / 1024, 2), 'modified': modified_time, 'candle_count': candle_count }) return result except Exception as e: print(f"Error getting available data: {str(e)}") return []