Files
Trading-Bot/src/data/storage.py
DaM ccd1fb3e42 Sistema de trading bot - Semanas 1-2 completadas
- Infraestructura de datos completa
- Descarga desde exchanges (CCXT)
- Procesamiento y limpieza de datos
- Almacenamiento en PostgreSQL
- Sistema anti-duplicados
- Script de descarga masiva
- Tests unitarios
- Documentación completa
2026-01-26 22:16:27 +01:00

390 lines
13 KiB
Python

# src/data/storage.py
"""
Módulo para almacenamiento persistente de datos en PostgreSQL y caché en Redis
"""
import pandas as pd
from sqlalchemy import create_engine, Column, String, Float, DateTime, Integer, Index, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from typing import Optional, List
import redis
import json
from ..monitoring.logger import log
Base = declarative_base()
class OHLCV(Base):
"""
Modelo de tabla para datos OHLCV
"""
__tablename__ = 'ohlcv'
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(DateTime, nullable=False)
symbol = Column(String(20), nullable=False)
timeframe = Column(String(10), nullable=False)
open = Column(Float, nullable=False)
high = Column(Float, nullable=False)
low = Column(Float, nullable=False)
close = Column(Float, nullable=False)
volume = Column(Float, nullable=False)
returns = Column(Float, nullable=True) # Retornos simples
log_returns = Column(Float, nullable=True) # Retornos logarítmicos
# Índices compuestos para queries rápidas
__table_args__ = (
Index('idx_symbol_timeframe_timestamp', 'symbol', 'timeframe', 'timestamp'),
Index('idx_timestamp', 'timestamp'),
# CONSTRAINT único: no permitir duplicados
# Una combinación de symbol + timeframe + timestamp debe ser única
{'sqlite_autoincrement': True}
)
# Añadir constraint único manualmente en __init__ de StorageManager
class StorageManager:
"""
Gestor de almacenamiento con PostgreSQL y Redis
"""
def __init__(
self,
db_host: str,
db_port: int,
db_name: str,
db_user: str,
db_password: str,
redis_host: str = 'localhost',
redis_port: int = 6379,
redis_db: int = 0
):
"""
Inicializa conexiones a PostgreSQL y Redis
"""
# PostgreSQL connection
db_url = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
try:
self.engine = create_engine(
db_url,
pool_size=10,
max_overflow=20,
echo=False
)
# Crear tablas si no existen
Base.metadata.create_all(self.engine)
# Añadir constraint único si no existe (para evitar duplicados)
try:
with self.engine.connect() as conn:
conn.execute(text("""
ALTER TABLE ohlcv
ADD CONSTRAINT unique_ohlcv
UNIQUE (symbol, timeframe, timestamp)
"""))
conn.commit()
log.info("Constraint único añadido a la tabla ohlcv")
except Exception as e:
# El constraint ya existe o hubo error (no crítico)
log.debug(f"Constraint único ya existe o no se pudo añadir: {e}")
# Crear sesión
Session = sessionmaker(bind=self.engine)
self.session = Session()
log.success("Conectado a PostgreSQL")
except Exception as e:
log.error(f"Error conectando a PostgreSQL: {e}")
raise
# Redis connection (para caché)
try:
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.redis_client.ping()
log.success("Conectado a Redis")
except Exception as e:
log.warning(f"No se pudo conectar a Redis: {e}. Continuando sin caché.")
self.redis_client = None
def save_ohlcv(self, df: pd.DataFrame, batch_size: int = 1000) -> int:
"""
Guarda datos OHLCV en la base de datos
Args:
df: DataFrame con datos OHLCV
batch_size: Tamaño de lote para inserción
Returns:
Número de registros guardados
"""
if df.empty:
log.warning("DataFrame vacío, nada que guardar")
return 0
log.info(f"Guardando {len(df)} registros en base de datos")
try:
# Preparar datos para inserción
df_to_save = df.reset_index()
# Renombrar columna de índice a timestamp si es necesario
if df_to_save.columns[0] != 'timestamp':
df_to_save.rename(columns={df_to_save.columns[0]: 'timestamp'}, inplace=True)
# Mantener todas las columnas relevantes
allowed_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'symbol', 'timeframe', 'returns', 'log_returns']
df_to_save = df_to_save[[col for col in allowed_columns if col in df_to_save.columns]]
# Insertar en lotes para mejor performance
records_saved = 0
records_skipped = 0
for i in range(0, len(df_to_save), batch_size):
batch = df_to_save.iloc[i:i+batch_size]
try:
# Usar to_sql con if_exists='append' y method='multi'
batch.to_sql(
'ohlcv',
self.engine,
if_exists='append',
index=False,
method='multi'
)
records_saved += len(batch)
log.debug(f"Guardados {records_saved}/{len(df_to_save)} registros")
except Exception as e:
# Si hay error de duplicados, intentar uno por uno
if 'unique' in str(e).lower() or 'duplicate' in str(e).lower():
log.warning(f"Duplicados detectados en batch, insertando uno por uno...")
for _, row in batch.iterrows():
try:
row.to_frame().T.to_sql(
'ohlcv',
self.engine,
if_exists='append',
index=False
)
records_saved += 1
except Exception:
# Este registro ya existe, saltarlo
records_skipped += 1
continue
else:
# Otro tipo de error, re-lanzar
raise e
if records_skipped > 0:
log.info(f"Saltados {records_skipped} registros duplicados")
log.success(f"Guardados {records_saved} registros exitosamente")
return records_saved
except Exception as e:
log.error(f"Error guardando datos: {e}")
self.session.rollback()
raise
def load_ohlcv(
self,
symbol: str,
timeframe: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
use_cache: bool = True
) -> pd.DataFrame:
"""
Carga datos OHLCV de la base de datos
Args:
symbol: Símbolo del par
timeframe: Timeframe
start_date: Fecha inicio (opcional)
end_date: Fecha fin (opcional)
use_cache: Si usar caché de Redis
Returns:
DataFrame con datos OHLCV
"""
# Generar cache key
cache_key = f"ohlcv:{symbol}:{timeframe}:{start_date}:{end_date}"
# Intentar obtener de caché
if use_cache and self.redis_client:
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
log.debug(f"Datos obtenidos de caché: {cache_key}")
df = pd.read_json(cached_data)
df.set_index('timestamp', inplace=True)
return df
except Exception as e:
log.warning(f"Error leyendo caché: {e}")
# Construir query
query = f"""
SELECT timestamp, open, high, low, close, volume, symbol, timeframe
FROM ohlcv
WHERE symbol = '{symbol}' AND timeframe = '{timeframe}'
"""
if start_date:
query += f" AND timestamp >= '{start_date}'"
if end_date:
query += f" AND timestamp <= '{end_date}'"
query += " ORDER BY timestamp ASC"
try:
df = pd.read_sql(query, self.engine)
if df.empty:
log.warning(f"No se encontraron datos para {symbol} {timeframe}")
return pd.DataFrame()
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
log.info(f"Cargados {len(df)} registros de {symbol} {timeframe}")
# Guardar en caché
if self.redis_client:
try:
df_json = df.reset_index().to_json()
self.redis_client.setex(cache_key, 3600, df_json) # TTL 1 hora
log.debug(f"Datos guardados en caché: {cache_key}")
except Exception as e:
log.warning(f"Error guardando en caché: {e}")
return df
except Exception as e:
log.error(f"Error cargando datos: {e}")
raise
def get_latest_timestamp(self, symbol: str, timeframe: str) -> Optional[datetime]:
"""
Obtiene el último timestamp disponible para un símbolo/timeframe
Args:
symbol: Símbolo del par
timeframe: Timeframe
Returns:
Último timestamp o None si no hay datos
"""
query = f"""
SELECT MAX(timestamp) as last_timestamp
FROM ohlcv
WHERE symbol = '{symbol}' AND timeframe = '{timeframe}'
"""
try:
result = pd.read_sql(query, self.engine)
last_timestamp = result['last_timestamp'].iloc[0]
if pd.isna(last_timestamp):
log.debug(f"No hay datos previos para {symbol} {timeframe}")
return None
return last_timestamp
except Exception as e:
log.error(f"Error obteniendo último timestamp: {e}")
return None
def delete_ohlcv(
self,
symbol: str,
timeframe: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> int:
"""
Elimina datos OHLCV
Args:
symbol: Símbolo del par
timeframe: Timeframe
start_date: Fecha inicio (opcional)
end_date: Fecha fin (opcional)
Returns:
Número de registros eliminados
"""
query = f"""
DELETE FROM ohlcv
WHERE symbol = '{symbol}' AND timeframe = '{timeframe}'
"""
if start_date:
query += f" AND timestamp >= '{start_date}'"
if end_date:
query += f" AND timestamp <= '{end_date}'"
try:
result = self.engine.execute(query)
deleted_count = result.rowcount
log.info(f"Eliminados {deleted_count} registros")
# Invalidar caché
if self.redis_client:
cache_pattern = f"ohlcv:{symbol}:{timeframe}:*"
keys = self.redis_client.keys(cache_pattern)
if keys:
self.redis_client.delete(*keys)
return deleted_count
except Exception as e:
log.error(f"Error eliminando datos: {e}")
raise
def get_available_data(self) -> pd.DataFrame:
"""
Obtiene resumen de datos disponibles en la base de datos
Returns:
DataFrame con información de símbolos y timeframes disponibles
"""
query = """
SELECT
symbol,
timeframe,
MIN(timestamp) as first_date,
MAX(timestamp) as last_date,
COUNT(*) as record_count
FROM ohlcv
GROUP BY symbol, timeframe
ORDER BY symbol, timeframe
"""
try:
df = pd.read_sql(query, self.engine)
log.info(f"Información de {len(df)} conjuntos de datos")
return df
except Exception as e:
log.error(f"Error obteniendo información de datos: {e}")
raise
def close(self):
"""
Cierra conexiones
"""
try:
self.session.close()
self.engine.dispose()
if self.redis_client:
self.redis_client.close()
log.info("Conexiones cerradas")
except Exception as e:
log.error(f"Error cerrando conexiones: {e}")