diff --git a/.gitignore b/.gitignore index 0d356df..4c17f0a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ config/secrets.env # Carpetas de bases de datos -data_history/ +data/ # Carpetas de entorno virtual venv/ diff --git a/README.md b/README.md index 458630e..877a61b 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,72 @@ -# 🤖 Trading Bot - Semanas 1-2: Data Pipeline +# 🤖 Trading Bot - Proyecto Completo -Bot de trading algorítmico desarrollado desde cero. Esta es la primera fase enfocada en el pipeline de datos. +Bot de trading algorítmico desarrollado desde cero con Python, PostgreSQL y Machine Learning. ## 📋 Tabla de Contenidos +- [Estado del Proyecto](#estado-del-proyecto) - [Requisitos](#requisitos) - [Instalación](#instalación) - [Configuración](#configuración) - [Uso](#uso) - [Estructura del Proyecto](#estructura-del-proyecto) +- [Base de Datos](#base-de-datos) +- [Scripts Disponibles](#scripts-disponibles) - [Testing](#testing) -- [Próximos Pasos](#próximos-pasos) +- [Roadmap](#roadmap) +- [Troubleshooting](#troubleshooting) + +## 🎯 Estado del Proyecto + +### ✅ Completado (Semanas 1-2) + +- ✅ Sistema de logging robusto con rotación de archivos +- ✅ Conexión a exchanges vía CCXT (Binance por defecto) +- ✅ Descarga de datos históricos con reintentos automáticos +- ✅ Descarga incremental (continuar desde último timestamp) +- ✅ Procesamiento y limpieza de datos +- ✅ Detección de gaps y outliers +- ✅ Resampleo de timeframes (1h → 4h, 1d, etc.) +- ✅ Cálculo de retornos (simples y logarítmicos) +- ✅ Almacenamiento en PostgreSQL con índices optimizados +- ✅ Sistema anti-duplicados con constraints únicos +- ✅ Caché con Redis (opcional) +- ✅ Script de descarga masiva para múltiples símbolos +- ✅ Tests unitarios +- ✅ Manejo de errores y reintentos + +**Datos descargados actualmente:** +- 5 criptomonedas (BTC, ETH, BNB, SOL, XRP) +- 3 timeframes (1h, 4h, 1d) +- 120 días de histórico +- ~54,000 registros totales + +### 🔄 En Progreso + +- ⏳ Backtesting Engine (Semanas 3-4) +- ⏳ Estrategias de trading (Semanas 5-8) +- ⏳ Machine Learning (Semanas 5-8) + +### 📅 Planificado + +- 📋 Live trading con paper trading +- 📋 Gestión de riesgo avanzada +- 📋 Optimización de estrategias +- 📋 Dashboard web +- 📋 Alertas y notificaciones ## 🔧 Requisitos ### Software -- Python 3.10 o superior -- PostgreSQL 13 o superior -- Redis 6 o superior (opcional, para caché) +- **Python 3.10+** (probado con 3.12.3) +- **PostgreSQL 13+** +- **Redis 6+** (opcional, para caché) - Git -### Hardware (mínimo para desarrollo) -- 8GB RAM +### Hardware Recomendado +- 8GB RAM (mínimo) - 20GB espacio en disco +- Para ML: GPU recomendada (futuro) ## 📦 Instalación @@ -36,12 +80,12 @@ cd trading-bot ### 2. Crear entorno virtual ```bash -python -m venv venv +python3 -m venv venv -# En Linux/Mac: +# Linux/Mac: source venv/bin/activate -# En Windows: +# Windows: venv\Scripts\activate ``` @@ -59,12 +103,13 @@ pip install -r requirements.txt sudo apt update sudo apt install postgresql postgresql-contrib sudo systemctl start postgresql +sudo systemctl enable postgresql ``` **macOS (con Homebrew):** ```bash -brew install postgresql -brew services start postgresql +brew install postgresql@16 +brew services start postgresql@16 ``` **Windows:** @@ -80,10 +125,18 @@ sudo -u postgres psql CREATE DATABASE trading_bot; CREATE USER trading_user WITH PASSWORD 'tu_password_seguro'; GRANT ALL PRIVILEGES ON DATABASE trading_bot TO trading_user; + +# Conectar a la base de datos +\c trading_bot + +# Dar permisos sobre el schema +GRANT ALL ON SCHEMA public TO trading_user; + +# Salir \q ``` -### 6. Instalar Redis (opcional) +### 6. Instalar Redis (opcional pero recomendado) **Ubuntu/Debian:** ```bash @@ -97,31 +150,27 @@ brew install redis brew services start redis ``` -**Windows:** -Descargar desde [redis.io](https://redis.io/download) o usar WSL - ## ⚙️ Configuración -### 1. Copiar archivo de configuración +### 1. Crear archivo de configuración ```bash -cp .env.example .env +# El archivo debe estar en config/secrets.env +# Usa este template: ``` -### 2. Editar `.env` con tus credenciales - -```bash -# Exchange (para datos públicos no se necesita API key) +```env +# Exchange (para datos públicos NO necesitas API keys) EXCHANGE_NAME=binance API_KEY= API_SECRET= -# Base de datos +# Database DB_HOST=localhost DB_PORT=5432 DB_NAME=trading_bot DB_USER=trading_user -DB_PASSWORD=tu_password_seguro +DB_PASSWORD=tu_password_aqui # Redis (opcional) REDIS_HOST=localhost @@ -133,34 +182,67 @@ ENVIRONMENT=development LOG_LEVEL=INFO ``` -### 3. Verificar configuración de settings.yaml +### 2. Configurar símbolos y timeframes -El archivo `config/settings.yaml` contiene configuraciones generales que puedes ajustar: +Edita `config/settings.yaml` para personalizar: ```yaml trading: symbols: - BTC/USDT - ETH/USDT + - BNB/USDT timeframes: - 1h - 4h + - 1d + +data: + fetch_interval: 60 + historical_days: 120 + max_retries: 3 ``` ## 🚀 Uso -### Ejecutar demo completo +### Demo rápido (verificar instalación) ```bash python main.py ``` -Este comando ejecutará el pipeline completo: -1. Conexión al exchange (Binance por defecto) -2. Descarga de datos históricos -3. Procesamiento y limpieza -4. Almacenamiento en PostgreSQL -5. Verificación de datos +Este script: +- Descarga 1 día de BTC/USDT +- Muestra el pipeline completo +- Guarda en PostgreSQL +- Muestra estadísticas + +### Descarga masiva de datos + +```bash +python download_data.py +``` + +Este script descarga: +- Múltiples símbolos configurables +- Múltiples timeframes +- Días históricos configurables +- Muestra progreso en tiempo real +- Previene duplicados automáticamente + +**Personalizar descarga:** +Edita `download_data.py` líneas ~28-45: + +```python +symbols = [ + 'BTC/USDT', + 'ETH/USDT', + # Añade más aquí +] + +timeframes = ['1h', '4h', '1d'] +days_back = 120 # Cambia aquí +``` ### Uso programático @@ -169,18 +251,19 @@ from src.data.fetcher import DataFetcher from src.data.processor import DataProcessor from src.data.storage import StorageManager -# Inicializar fetcher +# Inicializar fetcher = DataFetcher('binance') +processor = DataProcessor() +storage = StorageManager(...) -# Obtener datos -df = fetcher.fetch_historical('BTC/USDT', timeframe='1h', days=7) +# Descargar +df = fetcher.fetch_historical('BTC/USDT', timeframe='1h', days=30) # Procesar -processor = DataProcessor() df_clean = processor.clean_data(df) +df_clean = processor.calculate_returns(df_clean) # Guardar -storage = StorageManager(...) storage.save_ohlcv(df_clean) ``` @@ -188,26 +271,155 @@ storage.save_ohlcv(df_clean) ``` trading-bot/ -├── config/ # Configuración -│ ├── settings.yaml # Configuración general -│ └── .env # Variables de entorno (no subir a git) -├── src/ # Código fuente -│ ├── data/ # Módulo de datos -│ │ ├── fetcher.py # Obtención de datos -│ │ ├── processor.py # Procesamiento -│ │ └── storage.py # Almacenamiento -│ └── utils/ # Utilidades -│ └── logger.py # Sistema de logging -├── tests/ # Tests unitarios -│ └── test_data.py # Tests del módulo de datos -├── data/ # Datos locales -│ └── historical/ # Datos históricos -├── logs/ # Archivos de log -├── requirements.txt # Dependencias Python -├── main.py # Punto de entrada -└── README.md # Este archivo +├── config/ # Configuración +│ ├── settings.yaml # Configuración general +│ └── secrets.env # Credenciales (NO subir a git) +│ +├── src/ # Código fuente +│ ├── data/ # Módulo de datos +│ │ ├── __init__.py +│ │ ├── fetcher.py # Descarga desde exchanges +│ │ ├── processor.py # Limpieza y procesamiento +│ │ └── storage.py # PostgreSQL + Redis +│ │ +│ ├── backtest/ # Motor de backtesting (próximo) +│ ├── strategies/ # Estrategias de trading (próximo) +│ ├── ml/ # Machine Learning (futuro) +│ └── utils/ # Utilidades +│ └── logger.py # Sistema de logging +│ +├── tests/ # Tests unitarios +│ └── test_data.py +│ +├── data/ # Datos locales +│ ├── historical/ # Backups (futuro) +│ └── exports/ # Exportaciones (futuro) +│ +├── logs/ # Archivos de log +│ ├── trading_bot_*.log +│ └── errors_*.log +│ +├── main.py # Demo/testing +├── download_data.py # Descarga masiva +├── requirements.txt # Dependencias +├── .gitignore +└── README.md ``` +## 🗄️ Base de Datos + +### Ubicación de PostgreSQL + +```bash +# Ver ubicación de los datos +sudo -u postgres psql -c "SHOW data_directory;" +# Típicamente: /var/lib/postgresql/16/main +``` + +### Tabla OHLCV (estructura) + +```sql +CREATE TABLE ohlcv ( + id SERIAL PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + symbol VARCHAR(20) NOT NULL, + timeframe VARCHAR(10) NOT NULL, + open FLOAT NOT NULL, + high FLOAT NOT NULL, + low FLOAT NOT NULL, + close FLOAT NOT NULL, + volume FLOAT NOT NULL, + returns FLOAT, -- Retornos simples + log_returns FLOAT, -- Retornos logarítmicos + CONSTRAINT unique_ohlcv UNIQUE (symbol, timeframe, timestamp) +); + +-- Índices para queries rápidas +CREATE INDEX idx_symbol_timeframe_timestamp ON ohlcv(symbol, timeframe, timestamp); +CREATE INDEX idx_timestamp ON ohlcv(timestamp); +``` + +### Consultas útiles + +```sql +-- Conectar a la base de datos +psql -U trading_user -d trading_bot -h localhost + +-- Ver todas las tablas +\dt + +-- Contar registros totales +SELECT COUNT(*) FROM ohlcv; + +-- Ver datos disponibles por símbolo +SELECT + symbol, + timeframe, + COUNT(*) as registros, + MIN(timestamp) as desde, + MAX(timestamp) as hasta +FROM ohlcv +GROUP BY symbol, timeframe +ORDER BY symbol, timeframe; + +-- Ver últimos 10 registros de BTC +SELECT * FROM ohlcv +WHERE symbol = 'BTC/USDT' AND timeframe = '1h' +ORDER BY timestamp DESC +LIMIT 10; + +-- Estadísticas de retornos +SELECT + symbol, + timeframe, + AVG(returns) as retorno_medio, + STDDEV(returns) as volatilidad, + MIN(returns) as peor_retorno, + MAX(returns) as mejor_retorno +FROM ohlcv +WHERE returns IS NOT NULL +GROUP BY symbol, timeframe; +``` + +### Backup de la base de datos + +```bash +# Backup completo +pg_dump -U trading_user -d trading_bot -h localhost > backup_$(date +%Y%m%d).sql + +# Backup solo tabla ohlcv +pg_dump -U trading_user -d trading_bot -h localhost -t ohlcv > backup_ohlcv.sql + +# Restaurar desde backup +psql -U trading_user -d trading_bot -h localhost < backup.sql +``` + +## 📜 Scripts Disponibles + +### `main.py` - Demo y Testing + +```bash +python main.py +``` + +**Uso:** Verificar que todo funciona correctamente +**Descarga:** 1 símbolo, 1 timeframe, pocos días +**Muestra:** Pipeline completo con estadísticas detalladas + +### `download_data.py` - Descarga Masiva + +```bash +python download_data.py +``` + +**Uso:** Llenar base de datos con datos históricos +**Configurable:** Símbolos, timeframes, días +**Características:** +- Progreso en tiempo real +- Prevención de duplicados +- Manejo de errores robusto +- Resumen final con estadísticas + ## 🧪 Testing ### Ejecutar todos los tests @@ -216,90 +428,145 @@ trading-bot/ pytest tests/ -v ``` -### Ejecutar tests con cobertura +### Tests con cobertura ```bash pytest tests/ --cov=src --cov-report=html +# Ver reporte en htmlcov/index.html ``` -### Ejecutar test específico +### Test específico ```bash pytest tests/test_data.py::TestDataProcessor::test_clean_data_removes_duplicates -v ``` -## 📊 Funcionalidades Implementadas +## 🗺️ Roadmap -### ✅ Completado (Semanas 1-2) +### ✅ Fase 1: Infraestructura de Datos (COMPLETADO) +- Sistema de descarga robusto +- Almacenamiento optimizado +- Procesamiento de datos -- [x] Sistema de logging robusto -- [x] Conexión a exchanges vía CCXT -- [x] Descarga de datos históricos -- [x] Descarga incremental (continuar desde último timestamp) -- [x] Procesamiento y limpieza de datos -- [x] Detección de gaps y outliers -- [x] Resampleo de timeframes -- [x] Cálculo de retornos -- [x] Almacenamiento en PostgreSQL -- [x] Caché con Redis -- [x] Tests unitarios -- [x] Manejo de errores y reintentos +### 🔄 Fase 2: Backtesting (PRÓXIMO - Semanas 3-4) +- Motor de backtesting +- Estrategia simple (moving average crossover) +- Métricas de performance +- Visualizaciones -## 🔜 Próximos Pasos (Semanas 3-4) +### 📅 Fase 3: Estrategias Avanzadas (Semanas 5-8) +- Indicadores técnicos +- Machine Learning básico +- Optimización de parámetros -- [ ] Engine de backtesting -- [ ] Métricas de performance (Sharpe, Sortino, Max Drawdown) -- [ ] Visualizaciones de resultados -- [ ] Estrategia simple de trading -- [ ] Simulación histórica +### 📅 Fase 4: Trading Real (Semanas 9-12) +- Paper trading +- Gestión de riesgo +- Ejecución de órdenes +- Monitoreo en tiempo real + +### 📅 Fase 5: Producción (Futuro) +- Dashboard web +- Alertas y notificaciones +- Multi-exchange +- Despliegue en servidor ## 🐛 Troubleshooting ### Error: "No se puede conectar a PostgreSQL" -**Solución:** ```bash -# Verificar que PostgreSQL está corriendo +# Verificar que está corriendo sudo systemctl status postgresql -# Verificar credenciales en .env -# Verificar que el usuario tiene permisos +# Reiniciar si es necesario +sudo systemctl restart postgresql + +# Verificar credenciales en config/secrets.env ``` -### Error: "ModuleNotFoundError: No module named 'ccxt'" +### Error: "Invalid Api-Key ID" -**Solución:** +**Solución:** Para datos públicos NO necesitas API keys. Deja vacíos `API_KEY` y `API_SECRET` en `config/secrets.env`. + +### Error: "column does not exist" + +**Solución:** Recrear la tabla: + +```sql +DROP TABLE IF EXISTS ohlcv CASCADE; +``` + +Luego ejecutar `python main.py` para recrearla. + +### Error: "duplicate key value violates unique constraint" + +**Solución:** Esto es normal y esperado. El sistema previene automáticamente duplicados. Si quieres limpiar duplicados existentes: + +```sql +DELETE FROM ohlcv a USING ohlcv b +WHERE a.id > b.id +AND a.symbol = b.symbol +AND a.timeframe = b.timeframe +AND a.timestamp = b.timestamp; +``` + +### Redis no está disponible + +**No es crítico.** El bot funciona sin Redis, solo perderás caché. Logs mostrarán: "Continuando sin caché." + +Para instalar Redis: ```bash -# Asegurarse de que el entorno virtual está activado -source venv/bin/activate # Linux/Mac -venv\Scripts\activate # Windows - -# Reinstalar dependencias -pip install -r requirements.txt +sudo apt install redis-server +sudo systemctl start redis ``` -### Error: Rate limit exceeded +### Downloads muy lentos -**Solución:** -El código ya incluye manejo de rate limiting, pero si persiste: -- Aumentar delays en `config/settings.yaml` -- Reducir el número de símbolos/timeframes -- Usar API keys para límites más altos +- Verifica tu conexión a internet +- El exchange puede tener rate limiting +- Para Binance sin API keys: ~1000 requests/min + +### La descarga se queda colgada + +- Presiona `Ctrl+C` para cancelar +- Revisa logs en `logs/trading_bot_*.log` +- Verifica que el exchange esté accesible ## 📝 Notas Importantes -⚠️ **IMPORTANTE**: Este bot es para fines educativos. No ejecutes trading real sin: -1. Backtesting exhaustivo (mínimo 3-5 años) -2. Paper trading extensivo (varios meses) -3. Gestión de riesgo robusta -4. Comprensión completa del código +### ⚠️ Advertencia Legal + +Este bot es para **fines educativos y de investigación**. El trading conlleva riesgo financiero significativo. + +**NO ejecutes trading real sin:** +1. ✅ Backtesting exhaustivo (mínimo 3-5 años de datos) +2. ✅ Paper trading extensivo (varios meses) +3. ✅ Gestión de riesgo robusta y probada +4. ✅ Comprensión completa del código y estrategias +5. ✅ Capital que puedas permitirte perder + +### 🔒 Seguridad + +- **NUNCA** subas `config/secrets.env` a git +- Usa contraseñas fuertes para PostgreSQL +- En producción, usa variables de entorno del sistema +- Limita permisos de archivos sensibles: `chmod 600 config/secrets.env` + +### 💾 Portabilidad (Futuro) + +Actualmente usa PostgreSQL (requiere instalación en cada máquina). + +**Plan futuro:** Script de exportación a SQLite para portabilidad completa: +```bash +python export_to_portable.py # Generará data/trading_bot.db +``` + +Esto permitirá copiar todo el proyecto en USB y ejecutar en cualquier PC. ## 🤝 Contribuir -Este es un proyecto de aprendizaje personal. Si encuentras bugs o tienes sugerencias: -1. Documenta el issue claramente -2. Incluye logs y pasos para reproducir -3. Propón solución si es posible +Este es un proyecto personal de aprendizaje. Sugerencias y mejoras son bienvenidas. ## 📄 Licencia @@ -307,9 +574,12 @@ MIT License - Usar bajo tu propio riesgo ## 📧 Contacto -Para dudas sobre el código o siguiente fase de desarrollo, consulta conmigo. +Para dudas sobre el código o siguientes fases de desarrollo, consulta conmigo. --- -**Versión actual:** 0.1.0 (Semanas 1-2 completadas) -**Última actualización:** Enero 2026 \ No newline at end of file +**Versión actual:** 0.2.0 (Semanas 1-2 completadas) +**Última actualización:** Enero 2026 +**Python:** 3.12.3 +**PostgreSQL:** 16+ +**Datos:** 5 símbolos, 3 timeframes, 120 días (~54k registros) \ No newline at end of file diff --git a/download_data.py b/download_data.py new file mode 100644 index 0000000..aaffab0 --- /dev/null +++ b/download_data.py @@ -0,0 +1,173 @@ +# download_data.py +""" +Script para descargar datos históricos de múltiples símbolos y timeframes +""" +import os +from dotenv import load_dotenv +from datetime import datetime +from pathlib import Path +from src.monitoring.logger import log +from src.data.fetcher import DataFetcher +from src.data.processor import DataProcessor +from src.data.storage import StorageManager + +def setup_environment(): + """Carga variables de entorno""" + env_path = Path(__file__).parent / 'config' / 'secrets.env' + load_dotenv(dotenv_path=env_path) + log.success("✓ Variables de entorno cargadas") + +def download_multiple_symbols(): + """ + Descarga datos históricos para múltiples símbolos y timeframes + """ + log.info("="*70) + log.info("📥 DESCARGA MASIVA DE DATOS HISTÓRICOS") + log.info("="*70) + + # Configuración + setup_environment() + + exchange_name = os.getenv('EXCHANGE_NAME', 'binance') + + # Símbolos a descargar (puedes añadir más) + symbols = [ + 'BTC/USDT', + 'ETH/USDT', + 'BNB/USDT', + 'SOL/USDT', + 'XRP/USDT', + ] + + # Timeframes a descargar + timeframes = [ + '1h', # 1 hora + '4h', # 4 horas + '1d', # 1 día + ] + + # Días históricos + days_back = 120 # 4 meses + + log.info(f"\n📊 Configuración:") + log.info(f" Exchange: {exchange_name}") + log.info(f" Símbolos: {len(symbols)} → {symbols}") + log.info(f" Timeframes: {timeframes}") + log.info(f" Días históricos: {days_back}") + log.info(f" Total descargas: {len(symbols) * len(timeframes)}") + + # Confirmar + print("\n" + "="*70) + response = input("¿Continuar con la descarga? (s/n): ").lower() + if response != 's': + log.warning("Descarga cancelada por el usuario") + return + + # Inicializar componentes + log.info("\n🔧 Inicializando componentes...") + + fetcher = DataFetcher( + exchange_name=exchange_name, + api_key=os.getenv('API_KEY') if os.getenv('API_KEY') else None, + api_secret=os.getenv('API_SECRET') if os.getenv('API_SECRET') else None + ) + + processor = DataProcessor() + + storage = StorageManager( + db_host=os.getenv('DB_HOST'), + db_port=int(os.getenv('DB_PORT', 5432)), + db_name=os.getenv('DB_NAME'), + db_user=os.getenv('DB_USER'), + db_password=os.getenv('DB_PASSWORD'), + redis_host=os.getenv('REDIS_HOST', 'localhost'), + redis_port=int(os.getenv('REDIS_PORT', 6379)) + ) + + # Estadísticas + total_downloads = len(symbols) * len(timeframes) + current_download = 0 + successful = 0 + failed = 0 + total_records = 0 + + # Descargar datos + log.info("\n" + "="*70) + log.info("🚀 INICIANDO DESCARGAS...") + log.info("="*70 + "\n") + + for symbol in symbols: + for timeframe in timeframes: + current_download += 1 + + log.info(f"\n{'='*70}") + log.info(f"📥 [{current_download}/{total_downloads}] {symbol} @ {timeframe}") + log.info(f"{'='*70}") + + try: + # Verificar si ya existen datos + last_timestamp = storage.get_latest_timestamp(symbol, timeframe) + + if last_timestamp: + log.info(f"⚠️ Ya existen datos hasta: {last_timestamp}") + log.info(f" Se descargarán solo datos nuevos...") + + # Descargar datos + df = fetcher.fetch_historical( + symbol=symbol, + timeframe=timeframe, + days=days_back + ) + + if df.empty: + log.warning(f"❌ No se obtuvieron datos para {symbol} @ {timeframe}") + failed += 1 + continue + + # Procesar + log.info(f"🧹 Procesando datos...") + df_clean = processor.clean_data(df) + df_clean = processor.calculate_returns(df_clean) + + # Guardar + log.info(f"💾 Guardando en base de datos...") + records_saved = storage.save_ohlcv(df_clean) + + total_records += records_saved + successful += 1 + + log.success(f"✅ {symbol} @ {timeframe} → {records_saved} registros guardados") + + except Exception as e: + log.error(f"❌ Error descargando {symbol} @ {timeframe}: {e}") + failed += 1 + continue + + # Resumen final + log.info("\n" + "="*70) + log.info("📊 RESUMEN DE DESCARGA") + log.info("="*70) + log.info(f"✅ Exitosas: {successful}/{total_downloads}") + log.info(f"❌ Fallidas: {failed}/{total_downloads}") + log.info(f"📦 Total registros guardados: {total_records:,}") + + # Ver datos disponibles + log.info("\n📋 Datos disponibles en base de datos:") + available_data = storage.get_available_data() + print(available_data.to_string(index=False)) + + # Cleanup + storage.close() + + log.info("\n" + "="*70) + log.success("✅ DESCARGA COMPLETADA") + log.info("="*70) + +if __name__ == "__main__": + try: + download_multiple_symbols() + except KeyboardInterrupt: + log.warning("\n⚠️ Descarga interrumpida por el usuario") + except Exception as e: + log.error(f"\n❌ Error fatal: {e}") + raise \ No newline at end of file diff --git a/main.py b/main.py index 1983131..e3fd126 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,10 @@ # main.py +# main.py """ Punto de entrada principal del bot de trading Demo para Semanas 1-2: Data Pipeline + Storage """ import os -from pathlib import Path from dotenv import load_dotenv from datetime import datetime, timedelta from src.monitoring.logger import log @@ -14,16 +14,12 @@ from src.data.storage import StorageManager def setup_environment(): """ - Carga variables de entorno desde config/secrets.env + Carga variables de entorno """ # Cargar desde config/secrets.env - project_root = Path(__file__).parent - env_path = project_root / 'config' / 'secrets.env' - if not env_path.exists(): - log.error(f"No se encuentra el archivo: {env_path}") - raise FileNotFoundError(f"Archivo de configuración no encontrado: {env_path}") - load_dotenv(env_path) - log.info(f"Cargando configuración desde: {env_path}") + from pathlib import Path + env_path = Path(__file__).parent / 'config' / 'secrets.env' + load_dotenv(dotenv_path=env_path) # Verificar variables requeridas required_vars = ['EXCHANGE_NAME', 'DB_HOST', 'DB_NAME', 'DB_USER', 'DB_PASSWORD'] @@ -33,7 +29,7 @@ def setup_environment(): log.error(f"Faltan variables de entorno: {missing_vars}") raise EnvironmentError(f"Variables faltantes: {missing_vars}") - log.success("✓ Variables de entorno cargadas correctamente") + log.success("Variables de entorno cargadas correctamente") def demo_data_pipeline(): """ @@ -50,15 +46,15 @@ def demo_data_pipeline(): exchange_name = os.getenv('EXCHANGE_NAME', 'binance') symbol = 'BTC/USDT' timeframe = '1h' - days_back = 7 + days_back = 1 # 2. Inicializar componentes log.info("\n[1/5] Inicializando componentes...") fetcher = DataFetcher( exchange_name=exchange_name, - api_key=os.getenv('API_KEY'), - api_secret=os.getenv('API_SECRET') + api_key=os.getenv('API_KEY') if os.getenv('API_KEY') else None, + api_secret=os.getenv('API_SECRET') if os.getenv('API_SECRET') else None ) processor = DataProcessor() diff --git a/src/data/fetcher.py b/src/data/fetcher.py index a9d86fe..a3f34d7 100644 --- a/src/data/fetcher.py +++ b/src/data/fetcher.py @@ -1,4 +1,5 @@ # src/data/fetcher.py +# src/data/fetcher.py """ Módulo para obtener datos de exchanges usando CCXT """ @@ -13,7 +14,7 @@ class DataFetcher: """ Clase para obtener datos históricos y en tiempo real de exchanges """ - + def __init__(self, exchange_name: str, api_key: str = None, api_secret: str = None): """ Inicializa la conexión con el exchange @@ -24,24 +25,39 @@ class DataFetcher: api_secret: API secret (opcional para datos públicos) """ self.exchange_name = exchange_name - + try: exchange_class = getattr(ccxt, exchange_name) - self.exchange = exchange_class({ - 'apiKey': api_key, - 'secret': api_secret, + + # Configuración base + config = { 'enableRateLimit': True, # Importante para evitar bans 'options': { 'defaultType': 'spot', # spot, future, etc } - }) - log.info(f"Conectado al exchange: {exchange_name}") + } + + # Solo añadir API keys si están presentes y no vacías + if api_key and api_secret: + config['apiKey'] = api_key + config['secret'] = api_secret + log.info(f"Conectado al exchange: {exchange_name} (con API keys)") + else: + log.info(f"Conectado al exchange: {exchange_name} (modo público - sin API keys)") + + self.exchange = exchange_class(config) + except Exception as e: log.error(f"Error conectando a {exchange_name}: {e}") raise - - def fetch_ohlcv(self, symbol: str, timeframe: str = '1h', since: Optional[datetime] = None, - limit: int = 500) -> pd.DataFrame: + + def fetch_ohlcv( + self, + symbol: str, + timeframe: str = '1h', + since: Optional[datetime] = None, + limit: int = 500 + ) -> pd.DataFrame: """ Obtiene datos OHLCV (Open, High, Low, Close, Volume) @@ -55,20 +71,20 @@ class DataFetcher: DataFrame con los datos OHLCV """ try: - # Convertir datetime a timestamp en ms + # Convertir datetime a timestamp en milisegundos since_ms = None if since: since_ms = int(since.timestamp() * 1000) - + log.info(f"Obteniendo datos OHLCV: {symbol} {timeframe}") - + ohlcv = self.exchange.fetch_ohlcv( symbol, timeframe=timeframe, since=since_ms, limit=limit ) - + # Convertir a DataFrame df = pd.DataFrame( ohlcv, @@ -85,13 +101,18 @@ class DataFetcher: log.success(f"Obtenidos {len(df)} registros de {symbol}") return df - + except Exception as e: log.error(f"Error obteniendo OHLCV para {symbol}: {e}") raise - - def fetch_historical(self, symbol: str, timeframe: str = '1h', days: int = 30, - max_retries: int = 3) -> pd.DataFrame: + + def fetch_historical( + self, + symbol: str, + timeframe: str = '1h', + days: int = 30, + max_retries: int = 3 + ) -> pd.DataFrame: """ Obtiene datos históricos completos (puede requerir múltiples llamadas) @@ -109,7 +130,11 @@ class DataFetcher: log.info(f"Iniciando descarga histórica: {symbol} desde {since.date()}") + iteration = 0 while True: + iteration += 1 + log.debug(f"Iteración {iteration}: Obteniendo datos desde {since}") + retry_count = 0 success = False @@ -120,7 +145,7 @@ class DataFetcher: if df.empty: log.warning(f"No hay más datos disponibles para {symbol}") success = True - break + break # Salir del while interno all_data.append(df) @@ -131,7 +156,7 @@ class DataFetcher: # Verificar si ya llegamos al presente if since >= datetime.now(): success = True - break + break # Salir del while interno success = True time.sleep(self.exchange.rateLimit / 1000) # Respetar rate limit @@ -143,10 +168,10 @@ class DataFetcher: if not success: log.error(f"Falló después de {max_retries} intentos") - break + break # Salir del while externo - if since >= datetime.now(): - break + if since >= datetime.now() or df.empty: + break # Salir del while externo si no hay más datos if not all_data: log.error("No se pudo obtener ningún dato histórico") diff --git a/src/data/storage.py b/src/data/storage.py index 565e466..c871172 100644 --- a/src/data/storage.py +++ b/src/data/storage.py @@ -3,7 +3,7 @@ Módulo para almacenamiento persistente de datos en PostgreSQL y caché en Redis """ import pandas as pd -from sqlalchemy import create_engine, Column, String, Float, DateTime, Integer, Index +from sqlalchemy import create_engine, Column, String, Float, DateTime, Integer, Index, text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime @@ -29,12 +29,19 @@ class OHLCV(Base): low = Column(Float, nullable=False) close = Column(Float, nullable=False) volume = Column(Float, nullable=False) + returns = Column(Float, nullable=True) # Retornos simples + log_returns = Column(Float, nullable=True) # Retornos logarítmicos # Índices compuestos para queries rápidas __table_args__ = ( Index('idx_symbol_timeframe_timestamp', 'symbol', 'timeframe', 'timestamp'), Index('idx_timestamp', 'timestamp'), + # CONSTRAINT único: no permitir duplicados + # Una combinación de symbol + timeframe + timestamp debe ser única + {'sqlite_autoincrement': True} ) + + # Añadir constraint único manualmente en __init__ de StorageManager class StorageManager: """ @@ -69,6 +76,20 @@ class StorageManager: # Crear tablas si no existen Base.metadata.create_all(self.engine) + # Añadir constraint único si no existe (para evitar duplicados) + try: + with self.engine.connect() as conn: + conn.execute(text(""" + ALTER TABLE ohlcv + ADD CONSTRAINT unique_ohlcv + UNIQUE (symbol, timeframe, timestamp) + """)) + conn.commit() + log.info("Constraint único añadido a la tabla ohlcv") + except Exception as e: + # El constraint ya existe o hubo error (no crítico) + log.debug(f"Constraint único ya existe o no se pudo añadir: {e}") + # Crear sesión Session = sessionmaker(bind=self.engine) self.session = Session() @@ -117,22 +138,53 @@ class StorageManager: if df_to_save.columns[0] != 'timestamp': df_to_save.rename(columns={df_to_save.columns[0]: 'timestamp'}, inplace=True) + # Mantener todas las columnas relevantes + allowed_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'symbol', 'timeframe', 'returns', 'log_returns'] + df_to_save = df_to_save[[col for col in allowed_columns if col in df_to_save.columns]] + # Insertar en lotes para mejor performance records_saved = 0 + records_skipped = 0 + for i in range(0, len(df_to_save), batch_size): batch = df_to_save.iloc[i:i+batch_size] - # Usar to_sql con if_exists='append' y method='multi' - batch.to_sql( - 'ohlcv', - self.engine, - if_exists='append', - index=False, - method='multi' - ) - - records_saved += len(batch) - log.debug(f"Guardados {records_saved}/{len(df_to_save)} registros") + try: + # Usar to_sql con if_exists='append' y method='multi' + batch.to_sql( + 'ohlcv', + self.engine, + if_exists='append', + index=False, + method='multi' + ) + records_saved += len(batch) + log.debug(f"Guardados {records_saved}/{len(df_to_save)} registros") + + except Exception as e: + # Si hay error de duplicados, intentar uno por uno + if 'unique' in str(e).lower() or 'duplicate' in str(e).lower(): + log.warning(f"Duplicados detectados en batch, insertando uno por uno...") + + for _, row in batch.iterrows(): + try: + row.to_frame().T.to_sql( + 'ohlcv', + self.engine, + if_exists='append', + index=False + ) + records_saved += 1 + except Exception: + # Este registro ya existe, saltarlo + records_skipped += 1 + continue + else: + # Otro tipo de error, re-lanzar + raise e + + if records_skipped > 0: + log.info(f"Saltados {records_skipped} registros duplicados") log.success(f"Guardados {records_saved} registros exitosamente") return records_saved