Files
Trading-Bot/main.py
DaM ccd1fb3e42 Sistema de trading bot - Semanas 1-2 completadas
- Infraestructura de datos completa
- Descarga desde exchanges (CCXT)
- Procesamiento y limpieza de datos
- Almacenamiento en PostgreSQL
- Sistema anti-duplicados
- Script de descarga masiva
- Tests unitarios
- Documentación completa
2026-01-26 22:16:27 +01:00

165 lines
5.0 KiB
Python

# main.py
# main.py
"""
Punto de entrada principal del bot de trading
Demo para Semanas 1-2: Data Pipeline + Storage
"""
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
from src.monitoring.logger import log
from src.data.fetcher import DataFetcher
from src.data.processor import DataProcessor
from src.data.storage import StorageManager
def setup_environment():
"""
Carga variables de entorno
"""
# Cargar desde config/secrets.env
from pathlib import Path
env_path = Path(__file__).parent / 'config' / 'secrets.env'
load_dotenv(dotenv_path=env_path)
# Verificar variables requeridas
required_vars = ['EXCHANGE_NAME', 'DB_HOST', 'DB_NAME', 'DB_USER', 'DB_PASSWORD']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
log.error(f"Faltan variables de entorno: {missing_vars}")
raise EnvironmentError(f"Variables faltantes: {missing_vars}")
log.success("Variables de entorno cargadas correctamente")
def demo_data_pipeline():
"""
Demo completo del pipeline de datos
"""
log.info("="*60)
log.info("DEMO: Data Pipeline + Storage")
log.info("="*60)
# 1. Setup
setup_environment()
# Configuración
exchange_name = os.getenv('EXCHANGE_NAME', 'binance')
symbol = 'BTC/USDT'
timeframe = '1h'
days_back = 1
# 2. Inicializar componentes
log.info("\n[1/5] Inicializando componentes...")
fetcher = DataFetcher(
exchange_name=exchange_name,
api_key=os.getenv('API_KEY') if os.getenv('API_KEY') else None,
api_secret=os.getenv('API_SECRET') if os.getenv('API_SECRET') else None
)
processor = DataProcessor()
storage = StorageManager(
db_host=os.getenv('DB_HOST'),
db_port=int(os.getenv('DB_PORT', 5432)),
db_name=os.getenv('DB_NAME'),
db_user=os.getenv('DB_USER'),
db_password=os.getenv('DB_PASSWORD'),
redis_host=os.getenv('REDIS_HOST', 'localhost'),
redis_port=int(os.getenv('REDIS_PORT', 6379))
)
# 3. Obtener datos
log.info(f"\n[2/5] Obteniendo datos históricos de {symbol}...")
df = fetcher.fetch_historical(
symbol=symbol,
timeframe=timeframe,
days=days_back
)
log.info(f"Datos obtenidos: {len(df)} velas")
log.info(f"Rango: {df.index[0]} a {df.index[-1]}")
log.info(f"\nPrimeras filas:\n{df.head()}")
# 4. Procesar datos
log.info("\n[3/5] Procesando datos...")
# Validar
is_valid = processor.validate_ohlcv(df)
log.info(f"Datos válidos: {is_valid}")
# Limpiar
df_clean = processor.clean_data(df)
# Detectar gaps
gaps = processor.detect_gaps(df_clean, timeframe)
if not gaps.empty:
log.warning(f"Gaps detectados:\n{gaps}")
# Calcular retornos
df_clean = processor.calculate_returns(df_clean)
log.info(f"\nEstadísticas de retornos:")
log.info(f"Retorno medio: {df_clean['returns'].mean():.4%}")
log.info(f"Volatilidad: {df_clean['returns'].std():.4%}")
log.info(f"Retorno total: {df_clean['returns'].sum():.4%}")
# 5. Guardar en base de datos
log.info("\n[4/5] Guardando en base de datos...")
records_saved = storage.save_ohlcv(df_clean)
log.success(f"Guardados {records_saved} registros")
# 6. Verificar almacenamiento
log.info("\n[5/5] Verificando almacenamiento...")
# Obtener último timestamp
last_timestamp = storage.get_latest_timestamp(symbol, timeframe)
log.info(f"Último timestamp en DB: {last_timestamp}")
# Cargar datos de vuelta
df_loaded = storage.load_ohlcv(
symbol=symbol,
timeframe=timeframe,
start_date=datetime.now() - timedelta(days=2)
)
log.info(f"Datos cargados de DB: {len(df_loaded)} registros")
# Resumen de datos disponibles
available_data = storage.get_available_data()
log.info(f"\nDatos disponibles en base de datos:\n{available_data}")
# 7. Demo adicional: resampleo
log.info("\n[EXTRA] Demo de resampleo de timeframe...")
df_4h = processor.resample_timeframe(df_clean, '4h')
log.info(f"Datos resampled a 4h: {len(df_4h)} velas")
log.info(f"\nPrimeras filas 4h:\n{df_4h.head()}")
# Guardar también el timeframe de 4h
storage.save_ohlcv(df_4h)
# 8. Obtener precio actual
log.info("\n[EXTRA] Obteniendo precio actual...")
ticker = fetcher.fetch_ticker(symbol)
log.info(f"Precio actual de {symbol}: ${ticker['last']:,.2f}")
log.info(f"24h High: ${ticker['high']:,.2f}")
log.info(f"24h Low: ${ticker['low']:,.2f}")
log.info(f"24h Volume: {ticker['baseVolume']:,.2f}")
# 9. Cleanup
storage.close()
log.info("\n" + "="*60)
log.success("DEMO COMPLETADO EXITOSAMENTE")
log.info("="*60)
return df_clean
if __name__ == "__main__":
try:
df = demo_data_pipeline()
except Exception as e:
log.error(f"Error en demo: {e}")
raise