# src/data/storage.py """ Módulo para almacenamiento persistente de datos en PostgreSQL y caché en Redis """ import pandas as pd from sqlalchemy import create_engine, Column, String, Float, DateTime, Integer, Index from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime from typing import Optional, List import redis import json from ..monitoring.logger import log Base = declarative_base() class OHLCV(Base): """ Modelo de tabla para datos OHLCV """ __tablename__ = 'ohlcv' id = Column(Integer, primary_key=True, autoincrement=True) timestamp = Column(DateTime, nullable=False) symbol = Column(String(20), nullable=False) timeframe = Column(String(10), nullable=False) open = Column(Float, nullable=False) high = Column(Float, nullable=False) low = Column(Float, nullable=False) close = Column(Float, nullable=False) volume = Column(Float, nullable=False) # Índices compuestos para queries rápidas __table_args__ = ( Index('idx_symbol_timeframe_timestamp', 'symbol', 'timeframe', 'timestamp'), Index('idx_timestamp', 'timestamp'), ) class StorageManager: """ Gestor de almacenamiento con PostgreSQL y Redis """ def __init__( self, db_host: str, db_port: int, db_name: str, db_user: str, db_password: str, redis_host: str = 'localhost', redis_port: int = 6379, redis_db: int = 0 ): """ Inicializa conexiones a PostgreSQL y Redis """ # PostgreSQL connection db_url = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" try: self.engine = create_engine( db_url, pool_size=10, max_overflow=20, echo=False ) # Crear tablas si no existen Base.metadata.create_all(self.engine) # Crear sesión Session = sessionmaker(bind=self.engine) self.session = Session() log.success("Conectado a PostgreSQL") except Exception as e: log.error(f"Error conectando a PostgreSQL: {e}") raise # Redis connection (para caché) try: self.redis_client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True ) self.redis_client.ping() log.success("Conectado a Redis") except Exception as e: log.warning(f"No se pudo conectar a Redis: {e}. Continuando sin caché.") self.redis_client = None def save_ohlcv(self, df: pd.DataFrame, batch_size: int = 1000) -> int: """ Guarda datos OHLCV en la base de datos Args: df: DataFrame con datos OHLCV batch_size: Tamaño de lote para inserción Returns: Número de registros guardados """ if df.empty: log.warning("DataFrame vacío, nada que guardar") return 0 log.info(f"Guardando {len(df)} registros en base de datos") try: # Preparar datos para inserción df_to_save = df.reset_index() # Renombrar columna de índice a timestamp si es necesario if df_to_save.columns[0] != 'timestamp': df_to_save.rename(columns={df_to_save.columns[0]: 'timestamp'}, inplace=True) # Insertar en lotes para mejor performance records_saved = 0 for i in range(0, len(df_to_save), batch_size): batch = df_to_save.iloc[i:i+batch_size] # Usar to_sql con if_exists='append' y method='multi' batch.to_sql( 'ohlcv', self.engine, if_exists='append', index=False, method='multi' ) records_saved += len(batch) log.debug(f"Guardados {records_saved}/{len(df_to_save)} registros") log.success(f"Guardados {records_saved} registros exitosamente") return records_saved except Exception as e: log.error(f"Error guardando datos: {e}") self.session.rollback() raise def load_ohlcv( self, symbol: str, timeframe: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, use_cache: bool = True ) -> pd.DataFrame: """ Carga datos OHLCV de la base de datos Args: symbol: Símbolo del par timeframe: Timeframe start_date: Fecha inicio (opcional) end_date: Fecha fin (opcional) use_cache: Si usar caché de Redis Returns: DataFrame con datos OHLCV """ # Generar cache key cache_key = f"ohlcv:{symbol}:{timeframe}:{start_date}:{end_date}" # Intentar obtener de caché if use_cache and self.redis_client: try: cached_data = self.redis_client.get(cache_key) if cached_data: log.debug(f"Datos obtenidos de caché: {cache_key}") df = pd.read_json(cached_data) df.set_index('timestamp', inplace=True) return df except Exception as e: log.warning(f"Error leyendo caché: {e}") # Construir query query = f""" SELECT timestamp, open, high, low, close, volume, symbol, timeframe FROM ohlcv WHERE symbol = '{symbol}' AND timeframe = '{timeframe}' """ if start_date: query += f" AND timestamp >= '{start_date}'" if end_date: query += f" AND timestamp <= '{end_date}'" query += " ORDER BY timestamp ASC" try: df = pd.read_sql(query, self.engine) if df.empty: log.warning(f"No se encontraron datos para {symbol} {timeframe}") return pd.DataFrame() df['timestamp'] = pd.to_datetime(df['timestamp']) df.set_index('timestamp', inplace=True) log.info(f"Cargados {len(df)} registros de {symbol} {timeframe}") # Guardar en caché if self.redis_client: try: df_json = df.reset_index().to_json() self.redis_client.setex(cache_key, 3600, df_json) # TTL 1 hora log.debug(f"Datos guardados en caché: {cache_key}") except Exception as e: log.warning(f"Error guardando en caché: {e}") return df except Exception as e: log.error(f"Error cargando datos: {e}") raise def get_latest_timestamp(self, symbol: str, timeframe: str) -> Optional[datetime]: """ Obtiene el último timestamp disponible para un símbolo/timeframe Args: symbol: Símbolo del par timeframe: Timeframe Returns: Último timestamp o None si no hay datos """ query = f""" SELECT MAX(timestamp) as last_timestamp FROM ohlcv WHERE symbol = '{symbol}' AND timeframe = '{timeframe}' """ try: result = pd.read_sql(query, self.engine) last_timestamp = result['last_timestamp'].iloc[0] if pd.isna(last_timestamp): log.debug(f"No hay datos previos para {symbol} {timeframe}") return None return last_timestamp except Exception as e: log.error(f"Error obteniendo último timestamp: {e}") return None def delete_ohlcv( self, symbol: str, timeframe: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> int: """ Elimina datos OHLCV Args: symbol: Símbolo del par timeframe: Timeframe start_date: Fecha inicio (opcional) end_date: Fecha fin (opcional) Returns: Número de registros eliminados """ query = f""" DELETE FROM ohlcv WHERE symbol = '{symbol}' AND timeframe = '{timeframe}' """ if start_date: query += f" AND timestamp >= '{start_date}'" if end_date: query += f" AND timestamp <= '{end_date}'" try: result = self.engine.execute(query) deleted_count = result.rowcount log.info(f"Eliminados {deleted_count} registros") # Invalidar caché if self.redis_client: cache_pattern = f"ohlcv:{symbol}:{timeframe}:*" keys = self.redis_client.keys(cache_pattern) if keys: self.redis_client.delete(*keys) return deleted_count except Exception as e: log.error(f"Error eliminando datos: {e}") raise def get_available_data(self) -> pd.DataFrame: """ Obtiene resumen de datos disponibles en la base de datos Returns: DataFrame con información de símbolos y timeframes disponibles """ query = """ SELECT symbol, timeframe, MIN(timestamp) as first_date, MAX(timestamp) as last_date, COUNT(*) as record_count FROM ohlcv GROUP BY symbol, timeframe ORDER BY symbol, timeframe """ try: df = pd.read_sql(query, self.engine) log.info(f"Información de {len(df)} conjuntos de datos") return df except Exception as e: log.error(f"Error obteniendo información de datos: {e}") raise def close(self): """ Cierra conexiones """ try: self.session.close() self.engine.dispose() if self.redis_client: self.redis_client.close() log.info("Conexiones cerradas") except Exception as e: log.error(f"Error cerrando conexiones: {e}")