# src/data/fetcher.py """ Módulo para obtener datos de exchanges usando CCXT """ import ccxt import pandas as pd from datetime import datetime, timedelta from typing import List, Optional, Dict import time from ..utils.logger import log class DataFetcher: """ Clase para obtener datos históricos y en tiempo real de exchanges """ def __init__(self, exchange_name: str, api_key: str = None, api_secret: str = None): """ Inicializa la conexión con el exchange Args: exchange_name: Nombre del exchange (binance, kraken, etc) api_key: API key (opcional para datos públicos) api_secret: API secret (opcional para datos públicos) """ self.exchange_name = exchange_name try: exchange_class = getattr(ccxt, exchange_name) # Configuración base config = { 'enableRateLimit': True, # Importante para evitar bans 'options': { 'defaultType': 'spot', # spot, future, etc } } # Solo añadir API keys si están presentes y no vacías if api_key and api_secret: config['apiKey'] = api_key config['secret'] = api_secret log.info(f"Conectado al exchange: {exchange_name} (con API keys)") else: log.info(f"Conectado al exchange: {exchange_name} (modo público - sin API keys)") self.exchange = exchange_class(config) except Exception as e: log.error(f"Error conectando a {exchange_name}: {e}") raise def fetch_ohlcv( self, symbol: str, timeframe: str = '1h', since: Optional[datetime] = None, limit: int = 500 ) -> pd.DataFrame: """ Obtiene datos OHLCV (Open, High, Low, Close, Volume) Args: symbol: Par de trading (ej: 'BTC/USDT') timeframe: Intervalo de tiempo ('1m', '5m', '1h', '1d') since: Fecha desde la que obtener datos limit: Número máximo de velas a obtener Returns: DataFrame con los datos OHLCV """ try: # Convertir datetime a timestamp en milisegundos since_ms = None if since: since_ms = int(since.timestamp() * 1000) log.info(f"Obteniendo datos OHLCV: {symbol} {timeframe}") ohlcv = self.exchange.fetch_ohlcv( symbol, timeframe=timeframe, since=since_ms, limit=limit ) # Convertir a DataFrame df = pd.DataFrame( ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'] ) # Convertir timestamp a datetime df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) # Añadir metadata df['symbol'] = symbol df['timeframe'] = timeframe log.success(f"Obtenidos {len(df)} registros de {symbol}") return df except Exception as e: log.error(f"Error obteniendo OHLCV para {symbol}: {e}") raise def fetch_historical( self, symbol: str, timeframe: str = '1h', since: Optional[datetime] = None, until: Optional[datetime] = None, days: Optional[int] = None, max_retries: int = 3 ) -> pd.DataFrame: """ Obtiene datos históricos completos (puede requerir múltiples llamadas) Args: symbol: Par de trading timeframe: Intervalo de tiempo days: Días hacia atrás max_retries: Intentos máximos por request Returns: DataFrame con todos los datos históricos """ all_data = [] if since is None: if days is None: raise ValueError("Debes proporcionar 'since' o 'days'") since = datetime.utcnow() - timedelta(days=days) if until is None: until = datetime.utcnow() log.info(f"Iniciando descarga histórica: {symbol} desde {since.date()}") iteration = 0 while True: iteration += 1 log.debug(f"Iteración {iteration}: Obteniendo datos desde {since}") retry_count = 0 success = False while retry_count < max_retries and not success: try: df = self.fetch_ohlcv(symbol, timeframe, since, limit=1000) if df.empty: log.warning(f"No hay más datos disponibles para {symbol}") success = True break # Salir del while interno all_data.append(df) # Actualizar 'since' al último timestamp + 1 last_timestamp = df.index[-1] timeframe_seconds = self.exchange.parse_timeframe(timeframe) since = last_timestamp + pd.Timedelta(seconds=timeframe_seconds) # Verificar si ya llegamos al presente if since >= datetime.now(): success = True break # Salir del while interno success = True time.sleep(self.exchange.rateLimit / 1000) # Respetar rate limit except Exception as e: retry_count += 1 log.warning(f"Intento {retry_count}/{max_retries} falló: {e}") time.sleep(5 * retry_count) # Backoff exponencial if not success: log.error(f"Falló después de {max_retries} intentos") break # Salir del while externo if since >= datetime.now() or df.empty: break # Salir del while externo si no hay más datos if not all_data: log.error("No se pudo obtener ningún dato histórico") return pd.DataFrame() # Combinar todos los DataFrames final_df = pd.concat(all_data).drop_duplicates() final_df.sort_index(inplace=True) log.success(f"Descarga completa: {len(final_df)} velas de {symbol}") return final_df def fetch_ticker(self, symbol: str) -> Dict: """ Obtiene el precio actual y información del ticker Args: symbol: Par de trading Returns: Diccionario con información del ticker """ try: ticker = self.exchange.fetch_ticker(symbol) log.debug(f"Ticker de {symbol}: {ticker['last']}") return ticker except Exception as e: log.error(f"Error obteniendo ticker de {symbol}: {e}") raise def fetch_order_book(self, symbol: str, limit: int = 20) -> Dict: """ Obtiene el libro de órdenes (order book) Args: symbol: Par de trading limit: Profundidad del order book Returns: Diccionario con bids y asks """ try: order_book = self.exchange.fetch_order_book(symbol, limit) return order_book except Exception as e: log.error(f"Error obteniendo order book de {symbol}: {e}") raise def get_available_symbols(self) -> List[str]: """ Obtiene lista de símbolos disponibles en el exchange Returns: Lista de símbolos """ try: markets = self.exchange.load_markets() symbols = list(markets.keys()) log.info(f"Símbolos disponibles: {len(symbols)}") return symbols except Exception as e: log.error(f"Error obteniendo símbolos: {e}") raise