commit dcffd9dfad0abd60fc7a7ef6899bc5bb129cbef9 Author: DaM Date: Mon Jan 26 18:57:42 2026 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d356df --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +# Archivos de configuración con credenciales +config/secrets.env + +# Carpetas de bases de datos +data_history/ + +# Carpetas de entorno virtual +venv/ + +# Archivos de Python compilados +__pycache__/ + +# Logs +logs/ + +# Archivos temporales \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..458630e --- /dev/null +++ b/README.md @@ -0,0 +1,315 @@ +# 🤖 Trading Bot - Semanas 1-2: Data Pipeline + +Bot de trading algorítmico desarrollado desde cero. Esta es la primera fase enfocada en el pipeline de datos. + +## 📋 Tabla de Contenidos + +- [Requisitos](#requisitos) +- [Instalación](#instalación) +- [Configuración](#configuración) +- [Uso](#uso) +- [Estructura del Proyecto](#estructura-del-proyecto) +- [Testing](#testing) +- [Próximos Pasos](#próximos-pasos) + +## 🔧 Requisitos + +### Software +- Python 3.10 o superior +- PostgreSQL 13 o superior +- Redis 6 o superior (opcional, para caché) +- Git + +### Hardware (mínimo para desarrollo) +- 8GB RAM +- 20GB espacio en disco + +## 📦 Instalación + +### 1. Clonar el repositorio + +```bash +git clone +cd trading-bot +``` + +### 2. Crear entorno virtual + +```bash +python -m venv venv + +# En Linux/Mac: +source venv/bin/activate + +# En Windows: +venv\Scripts\activate +``` + +### 3. Instalar dependencias + +```bash +pip install --upgrade pip +pip install -r requirements.txt +``` + +### 4. Instalar PostgreSQL + +**Ubuntu/Debian:** +```bash +sudo apt update +sudo apt install postgresql postgresql-contrib +sudo systemctl start postgresql +``` + +**macOS (con Homebrew):** +```bash +brew install postgresql +brew services start postgresql +``` + +**Windows:** +Descargar instalador desde [postgresql.org](https://www.postgresql.org/download/windows/) + +### 5. Configurar base de datos + +```bash +# Conectar a PostgreSQL +sudo -u postgres psql + +# Crear base de datos y usuario +CREATE DATABASE trading_bot; +CREATE USER trading_user WITH PASSWORD 'tu_password_seguro'; +GRANT ALL PRIVILEGES ON DATABASE trading_bot TO trading_user; +\q +``` + +### 6. Instalar Redis (opcional) + +**Ubuntu/Debian:** +```bash +sudo apt install redis-server +sudo systemctl start redis +``` + +**macOS:** +```bash +brew install redis +brew services start redis +``` + +**Windows:** +Descargar desde [redis.io](https://redis.io/download) o usar WSL + +## ⚙️ Configuración + +### 1. Copiar archivo de configuración + +```bash +cp .env.example .env +``` + +### 2. Editar `.env` con tus credenciales + +```bash +# Exchange (para datos públicos no se necesita API key) +EXCHANGE_NAME=binance +API_KEY= +API_SECRET= + +# Base de datos +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=trading_bot +DB_USER=trading_user +DB_PASSWORD=tu_password_seguro + +# Redis (opcional) +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=0 + +# General +ENVIRONMENT=development +LOG_LEVEL=INFO +``` + +### 3. Verificar configuración de settings.yaml + +El archivo `config/settings.yaml` contiene configuraciones generales que puedes ajustar: + +```yaml +trading: + symbols: + - BTC/USDT + - ETH/USDT + timeframes: + - 1h + - 4h +``` + +## 🚀 Uso + +### Ejecutar demo completo + +```bash +python main.py +``` + +Este comando ejecutará el pipeline completo: +1. Conexión al exchange (Binance por defecto) +2. Descarga de datos históricos +3. Procesamiento y limpieza +4. Almacenamiento en PostgreSQL +5. Verificación de datos + +### Uso programático + +```python +from src.data.fetcher import DataFetcher +from src.data.processor import DataProcessor +from src.data.storage import StorageManager + +# Inicializar fetcher +fetcher = DataFetcher('binance') + +# Obtener datos +df = fetcher.fetch_historical('BTC/USDT', timeframe='1h', days=7) + +# Procesar +processor = DataProcessor() +df_clean = processor.clean_data(df) + +# Guardar +storage = StorageManager(...) +storage.save_ohlcv(df_clean) +``` + +## 📁 Estructura del Proyecto + +``` +trading-bot/ +├── config/ # Configuración +│ ├── settings.yaml # Configuración general +│ └── .env # Variables de entorno (no subir a git) +├── src/ # Código fuente +│ ├── data/ # Módulo de datos +│ │ ├── fetcher.py # Obtención de datos +│ │ ├── processor.py # Procesamiento +│ │ └── storage.py # Almacenamiento +│ └── utils/ # Utilidades +│ └── logger.py # Sistema de logging +├── tests/ # Tests unitarios +│ └── test_data.py # Tests del módulo de datos +├── data/ # Datos locales +│ └── historical/ # Datos históricos +├── logs/ # Archivos de log +├── requirements.txt # Dependencias Python +├── main.py # Punto de entrada +└── README.md # Este archivo +``` + +## 🧪 Testing + +### Ejecutar todos los tests + +```bash +pytest tests/ -v +``` + +### Ejecutar tests con cobertura + +```bash +pytest tests/ --cov=src --cov-report=html +``` + +### Ejecutar test específico + +```bash +pytest tests/test_data.py::TestDataProcessor::test_clean_data_removes_duplicates -v +``` + +## 📊 Funcionalidades Implementadas + +### ✅ Completado (Semanas 1-2) + +- [x] Sistema de logging robusto +- [x] Conexión a exchanges vía CCXT +- [x] Descarga de datos históricos +- [x] Descarga incremental (continuar desde último timestamp) +- [x] Procesamiento y limpieza de datos +- [x] Detección de gaps y outliers +- [x] Resampleo de timeframes +- [x] Cálculo de retornos +- [x] Almacenamiento en PostgreSQL +- [x] Caché con Redis +- [x] Tests unitarios +- [x] Manejo de errores y reintentos + +## 🔜 Próximos Pasos (Semanas 3-4) + +- [ ] Engine de backtesting +- [ ] Métricas de performance (Sharpe, Sortino, Max Drawdown) +- [ ] Visualizaciones de resultados +- [ ] Estrategia simple de trading +- [ ] Simulación histórica + +## 🐛 Troubleshooting + +### Error: "No se puede conectar a PostgreSQL" + +**Solución:** +```bash +# Verificar que PostgreSQL está corriendo +sudo systemctl status postgresql + +# Verificar credenciales en .env +# Verificar que el usuario tiene permisos +``` + +### Error: "ModuleNotFoundError: No module named 'ccxt'" + +**Solución:** +```bash +# Asegurarse de que el entorno virtual está activado +source venv/bin/activate # Linux/Mac +venv\Scripts\activate # Windows + +# Reinstalar dependencias +pip install -r requirements.txt +``` + +### Error: Rate limit exceeded + +**Solución:** +El código ya incluye manejo de rate limiting, pero si persiste: +- Aumentar delays en `config/settings.yaml` +- Reducir el número de símbolos/timeframes +- Usar API keys para límites más altos + +## 📝 Notas Importantes + +⚠️ **IMPORTANTE**: Este bot es para fines educativos. No ejecutes trading real sin: +1. Backtesting exhaustivo (mínimo 3-5 años) +2. Paper trading extensivo (varios meses) +3. Gestión de riesgo robusta +4. Comprensión completa del código + +## 🤝 Contribuir + +Este es un proyecto de aprendizaje personal. Si encuentras bugs o tienes sugerencias: +1. Documenta el issue claramente +2. Incluye logs y pasos para reproducir +3. Propón solución si es posible + +## 📄 Licencia + +MIT License - Usar bajo tu propio riesgo + +## 📧 Contacto + +Para dudas sobre el código o siguiente fase de desarrollo, consulta conmigo. + +--- + +**Versión actual:** 0.1.0 (Semanas 1-2 completadas) +**Última actualización:** Enero 2026 \ No newline at end of file diff --git a/config/secrets.env.example b/config/secrets.env.example new file mode 100644 index 0000000..be4a5d1 --- /dev/null +++ b/config/secrets.env.example @@ -0,0 +1,20 @@ +# Exchange API Keys (ejemplo con Binance) +EXCHANGE_NAME=binance +API_KEY=your_api_key_here +API_SECRET=your_api_secret_here + +# Database +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=trading_bot +DB_USER=postgres +DB_PASSWORD=your_password + +# Redis +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=0 + +# General +ENVIRONMENT=development +LOG_LEVEL=INFO \ No newline at end of file diff --git a/config/settings.yaml b/config/settings.yaml new file mode 100644 index 0000000..dbaf41b --- /dev/null +++ b/config/settings.yaml @@ -0,0 +1,28 @@ +trading: + symbols: + - BTC/USDT + - ETH/USDT + timeframes: + - 1m + - 5m + - 15m + - 1h + +data: + fetch_interval: 60 # segundos + historical_days: 365 + max_retries: 3 + retry_delay: 5 + +database: + pool_size: 10 + max_overflow: 20 + echo: false + +redis: + ttl: 3600 # time to live en segundos + +logging: + format: "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" + rotation: "500 MB" + retention: "30 days" \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..1983131 --- /dev/null +++ b/main.py @@ -0,0 +1,169 @@ +# main.py +""" +Punto de entrada principal del bot de trading +Demo para Semanas 1-2: Data Pipeline + Storage +""" +import os +from pathlib import Path +from dotenv import load_dotenv +from datetime import datetime, timedelta +from src.monitoring.logger import log +from src.data.fetcher import DataFetcher +from src.data.processor import DataProcessor +from src.data.storage import StorageManager + +def setup_environment(): + """ + Carga variables de entorno desde config/secrets.env + """ + # Cargar desde config/secrets.env + project_root = Path(__file__).parent + env_path = project_root / 'config' / 'secrets.env' + if not env_path.exists(): + log.error(f"No se encuentra el archivo: {env_path}") + raise FileNotFoundError(f"Archivo de configuración no encontrado: {env_path}") + load_dotenv(env_path) + log.info(f"Cargando configuración desde: {env_path}") + + # Verificar variables requeridas + required_vars = ['EXCHANGE_NAME', 'DB_HOST', 'DB_NAME', 'DB_USER', 'DB_PASSWORD'] + missing_vars = [var for var in required_vars if not os.getenv(var)] + + if missing_vars: + log.error(f"Faltan variables de entorno: {missing_vars}") + raise EnvironmentError(f"Variables faltantes: {missing_vars}") + + log.success("✓ Variables de entorno cargadas correctamente") + +def demo_data_pipeline(): + """ + Demo completo del pipeline de datos + """ + log.info("="*60) + log.info("DEMO: Data Pipeline + Storage") + log.info("="*60) + + # 1. Setup + setup_environment() + + # Configuración + exchange_name = os.getenv('EXCHANGE_NAME', 'binance') + symbol = 'BTC/USDT' + timeframe = '1h' + days_back = 7 + + # 2. Inicializar componentes + log.info("\n[1/5] Inicializando componentes...") + + fetcher = DataFetcher( + exchange_name=exchange_name, + api_key=os.getenv('API_KEY'), + api_secret=os.getenv('API_SECRET') + ) + + processor = DataProcessor() + + storage = StorageManager( + db_host=os.getenv('DB_HOST'), + db_port=int(os.getenv('DB_PORT', 5432)), + db_name=os.getenv('DB_NAME'), + db_user=os.getenv('DB_USER'), + db_password=os.getenv('DB_PASSWORD'), + redis_host=os.getenv('REDIS_HOST', 'localhost'), + redis_port=int(os.getenv('REDIS_PORT', 6379)) + ) + + # 3. Obtener datos + log.info(f"\n[2/5] Obteniendo datos históricos de {symbol}...") + + df = fetcher.fetch_historical( + symbol=symbol, + timeframe=timeframe, + days=days_back + ) + + log.info(f"Datos obtenidos: {len(df)} velas") + log.info(f"Rango: {df.index[0]} a {df.index[-1]}") + log.info(f"\nPrimeras filas:\n{df.head()}") + + # 4. Procesar datos + log.info("\n[3/5] Procesando datos...") + + # Validar + is_valid = processor.validate_ohlcv(df) + log.info(f"Datos válidos: {is_valid}") + + # Limpiar + df_clean = processor.clean_data(df) + + # Detectar gaps + gaps = processor.detect_gaps(df_clean, timeframe) + if not gaps.empty: + log.warning(f"Gaps detectados:\n{gaps}") + + # Calcular retornos + df_clean = processor.calculate_returns(df_clean) + + log.info(f"\nEstadísticas de retornos:") + log.info(f"Retorno medio: {df_clean['returns'].mean():.4%}") + log.info(f"Volatilidad: {df_clean['returns'].std():.4%}") + log.info(f"Retorno total: {df_clean['returns'].sum():.4%}") + + # 5. Guardar en base de datos + log.info("\n[4/5] Guardando en base de datos...") + + records_saved = storage.save_ohlcv(df_clean) + log.success(f"Guardados {records_saved} registros") + + # 6. Verificar almacenamiento + log.info("\n[5/5] Verificando almacenamiento...") + + # Obtener último timestamp + last_timestamp = storage.get_latest_timestamp(symbol, timeframe) + log.info(f"Último timestamp en DB: {last_timestamp}") + + # Cargar datos de vuelta + df_loaded = storage.load_ohlcv( + symbol=symbol, + timeframe=timeframe, + start_date=datetime.now() - timedelta(days=2) + ) + log.info(f"Datos cargados de DB: {len(df_loaded)} registros") + + # Resumen de datos disponibles + available_data = storage.get_available_data() + log.info(f"\nDatos disponibles en base de datos:\n{available_data}") + + # 7. Demo adicional: resampleo + log.info("\n[EXTRA] Demo de resampleo de timeframe...") + + df_4h = processor.resample_timeframe(df_clean, '4h') + log.info(f"Datos resampled a 4h: {len(df_4h)} velas") + log.info(f"\nPrimeras filas 4h:\n{df_4h.head()}") + + # Guardar también el timeframe de 4h + storage.save_ohlcv(df_4h) + + # 8. Obtener precio actual + log.info("\n[EXTRA] Obteniendo precio actual...") + ticker = fetcher.fetch_ticker(symbol) + log.info(f"Precio actual de {symbol}: ${ticker['last']:,.2f}") + log.info(f"24h High: ${ticker['high']:,.2f}") + log.info(f"24h Low: ${ticker['low']:,.2f}") + log.info(f"24h Volume: {ticker['baseVolume']:,.2f}") + + # 9. Cleanup + storage.close() + + log.info("\n" + "="*60) + log.success("DEMO COMPLETADO EXITOSAMENTE") + log.info("="*60) + + return df_clean + +if __name__ == "__main__": + try: + df = demo_data_pipeline() + except Exception as e: + log.error(f"Error en demo: {e}") + raise \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5e65ca3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,38 @@ +aiodns==4.0.0 +aiohappyeyeballs==2.6.1 +aiohttp==3.13.3 +aiosignal==1.4.0 +attrs==25.4.0 +ccxt==4.2.25 +certifi==2026.1.4 +cffi==2.0.0 +charset-normalizer==3.4.4 +cryptography==46.0.3 +frozenlist==1.8.0 +greenlet==3.3.0 +idna==3.11 +iniconfig==2.3.0 +loguru==0.7.2 +multidict==6.7.0 +numpy==1.26.4 +packaging==26.0 +pandas==2.1.4 +pluggy==1.6.0 +propcache==0.4.1 +psycopg2-binary==2.9.9 +pycares==5.0.1 +pycparser==3.0 +pytest==7.4.3 +python-dateutil==2.9.0.post0 +python-dotenv==1.0.0 +pytz==2025.2 +PyYAML==6.0.1 +redis==5.0.1 +requests==2.32.5 +setuptools==80.10.1 +six==1.17.0 +SQLAlchemy==2.0.23 +typing_extensions==4.15.0 +tzdata==2025.3 +urllib3==2.6.3 +yarl==1.22.0 diff --git a/src/backtest/engine.py b/src/backtest/engine.py new file mode 100644 index 0000000..0550934 --- /dev/null +++ b/src/backtest/engine.py @@ -0,0 +1,2 @@ +#Start Coding + diff --git a/src/backtest/metrics.py b/src/backtest/metrics.py new file mode 100644 index 0000000..860ef34 --- /dev/null +++ b/src/backtest/metrics.py @@ -0,0 +1 @@ +#Start Coding diff --git a/src/data/fetcher.py b/src/data/fetcher.py new file mode 100644 index 0000000..a9d86fe --- /dev/null +++ b/src/data/fetcher.py @@ -0,0 +1,212 @@ +# src/data/fetcher.py +""" +Módulo para obtener datos de exchanges usando CCXT +""" +import ccxt +import pandas as pd +from datetime import datetime, timedelta +from typing import List, Optional, Dict +import time +from ..monitoring.logger import log + +class DataFetcher: + """ + Clase para obtener datos históricos y en tiempo real de exchanges + """ + + def __init__(self, exchange_name: str, api_key: str = None, api_secret: str = None): + """ + Inicializa la conexión con el exchange + + Args: + exchange_name: Nombre del exchange (binance, kraken, etc) + api_key: API key (opcional para datos públicos) + api_secret: API secret (opcional para datos públicos) + """ + self.exchange_name = exchange_name + + try: + exchange_class = getattr(ccxt, exchange_name) + self.exchange = exchange_class({ + 'apiKey': api_key, + 'secret': api_secret, + 'enableRateLimit': True, # Importante para evitar bans + 'options': { + 'defaultType': 'spot', # spot, future, etc + } + }) + log.info(f"Conectado al exchange: {exchange_name}") + except Exception as e: + log.error(f"Error conectando a {exchange_name}: {e}") + raise + + def fetch_ohlcv(self, symbol: str, timeframe: str = '1h', since: Optional[datetime] = None, + limit: int = 500) -> pd.DataFrame: + """ + Obtiene datos OHLCV (Open, High, Low, Close, Volume) + + Args: + symbol: Par de trading (ej: 'BTC/USDT') + timeframe: Intervalo de tiempo ('1m', '5m', '1h', '1d') + since: Fecha desde la que obtener datos + limit: Número máximo de velas a obtener + + Returns: + DataFrame con los datos OHLCV + """ + try: + # Convertir datetime a timestamp en ms + since_ms = None + if since: + since_ms = int(since.timestamp() * 1000) + + log.info(f"Obteniendo datos OHLCV: {symbol} {timeframe}") + + ohlcv = self.exchange.fetch_ohlcv( + symbol, + timeframe=timeframe, + since=since_ms, + limit=limit + ) + + # Convertir a DataFrame + df = pd.DataFrame( + ohlcv, + columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'] + ) + + # Convertir timestamp a datetime + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') + df.set_index('timestamp', inplace=True) + + # Añadir metadata + df['symbol'] = symbol + df['timeframe'] = timeframe + + log.success(f"Obtenidos {len(df)} registros de {symbol}") + return df + + except Exception as e: + log.error(f"Error obteniendo OHLCV para {symbol}: {e}") + raise + + def fetch_historical(self, symbol: str, timeframe: str = '1h', days: int = 30, + max_retries: int = 3) -> pd.DataFrame: + """ + Obtiene datos históricos completos (puede requerir múltiples llamadas) + + Args: + symbol: Par de trading + timeframe: Intervalo de tiempo + days: Días hacia atrás + max_retries: Intentos máximos por request + + Returns: + DataFrame con todos los datos históricos + """ + all_data = [] + since = datetime.now() - timedelta(days=days) + + log.info(f"Iniciando descarga histórica: {symbol} desde {since.date()}") + + while True: + retry_count = 0 + success = False + + while retry_count < max_retries and not success: + try: + df = self.fetch_ohlcv(symbol, timeframe, since, limit=1000) + + if df.empty: + log.warning(f"No hay más datos disponibles para {symbol}") + success = True + break + + all_data.append(df) + + # Actualizar 'since' al último timestamp + 1 + last_timestamp = df.index[-1] + since = last_timestamp + pd.Timedelta(seconds=1) + + # Verificar si ya llegamos al presente + if since >= datetime.now(): + success = True + break + + success = True + time.sleep(self.exchange.rateLimit / 1000) # Respetar rate limit + + except Exception as e: + retry_count += 1 + log.warning(f"Intento {retry_count}/{max_retries} falló: {e}") + time.sleep(5 * retry_count) # Backoff exponencial + + if not success: + log.error(f"Falló después de {max_retries} intentos") + break + + if since >= datetime.now(): + break + + if not all_data: + log.error("No se pudo obtener ningún dato histórico") + return pd.DataFrame() + + # Combinar todos los DataFrames + final_df = pd.concat(all_data).drop_duplicates() + final_df.sort_index(inplace=True) + + log.success(f"Descarga completa: {len(final_df)} velas de {symbol}") + return final_df + + def fetch_ticker(self, symbol: str) -> Dict: + """ + Obtiene el precio actual y información del ticker + + Args: + symbol: Par de trading + + Returns: + Diccionario con información del ticker + """ + try: + ticker = self.exchange.fetch_ticker(symbol) + log.debug(f"Ticker de {symbol}: {ticker['last']}") + return ticker + except Exception as e: + log.error(f"Error obteniendo ticker de {symbol}: {e}") + raise + + def fetch_order_book(self, symbol: str, limit: int = 20) -> Dict: + """ + Obtiene el libro de órdenes (order book) + + Args: + symbol: Par de trading + limit: Profundidad del order book + + Returns: + Diccionario con bids y asks + """ + try: + order_book = self.exchange.fetch_order_book(symbol, limit) + return order_book + except Exception as e: + log.error(f"Error obteniendo order book de {symbol}: {e}") + raise + + def get_available_symbols(self) -> List[str]: + """ + Obtiene lista de símbolos disponibles en el exchange + + Returns: + Lista de símbolos + """ + try: + markets = self.exchange.load_markets() + symbols = list(markets.keys()) + log.info(f"Símbolos disponibles: {len(symbols)}") + return symbols + except Exception as e: + log.error(f"Error obteniendo símbolos: {e}") + raise \ No newline at end of file diff --git a/src/data/processor.py b/src/data/processor.py new file mode 100644 index 0000000..05328cf --- /dev/null +++ b/src/data/processor.py @@ -0,0 +1,269 @@ +# src/data/processor.py +""" +Módulo para limpiar, validar y procesar datos de mercado +""" +import pandas as pd +import numpy as np +from typing import Optional +from ..monitoring.logger import log + +class DataProcessor: + """ + Clase para procesar y limpiar datos de mercado + """ + + @staticmethod + def validate_ohlcv(df: pd.DataFrame) -> bool: + """ + Valida que un DataFrame tenga estructura OHLCV válida + + Args: + df: DataFrame a validar + + Returns: + True si es válido, False si no + """ + required_columns = ['open', 'high', 'low', 'close', 'volume'] + + # Verificar columnas requeridas + if not all(col in df.columns for col in required_columns): + log.error(f"Faltan columnas requeridas. Presentes: {df.columns.tolist()}") + return False + + # Verificar que el índice sea datetime + if not isinstance(df.index, pd.DatetimeIndex): + log.error("El índice debe ser DatetimeIndex") + return False + + # Verificar valores numéricos + for col in required_columns: + if not pd.api.types.is_numeric_dtype(df[col]): + log.error(f"Columna {col} no es numérica") + return False + + # Verificar relaciones lógicas: high >= low, etc + invalid_rows = (df['high'] < df['low']) | (df['high'] < df['open']) | (df['high'] < df['close']) + if invalid_rows.any(): + log.warning(f"Encontradas {invalid_rows.sum()} filas con valores ilógicos") + return False + + log.debug("Validación OHLCV exitosa") + return True + + @staticmethod + def clean_data(df: pd.DataFrame, remove_duplicates: bool = True) -> pd.DataFrame: + """ + Limpia datos: duplicados, NaN, outliers + + Args: + df: DataFrame a limpiar + remove_duplicates: Si eliminar duplicados + + Returns: + DataFrame limpio + """ + log.info(f"Limpiando datos. Filas iniciales: {len(df)}") + df_clean = df.copy() + + # Eliminar duplicados en el índice + if remove_duplicates: + before = len(df_clean) + df_clean = df_clean[~df_clean.index.duplicated(keep='first')] + removed = before - len(df_clean) + if removed > 0: + log.info(f"Eliminados {removed} duplicados") + + # Ordenar por índice + df_clean.sort_index(inplace=True) + + # Manejar valores faltantes + nan_count = df_clean.isnull().sum().sum() + if nan_count > 0: + log.warning(f"Encontrados {nan_count} valores NaN") + + # Forward fill para datos de serie temporal + df_clean.fillna(method='ffill', inplace=True) + + # Si aún quedan NaN al inicio, usar backward fill + df_clean.fillna(method='bfill', inplace=True) + + # Detectar y manejar outliers extremos (más de 100x el precio medio) + for col in ['open', 'high', 'low', 'close']: + if col in df_clean.columns: + median = df_clean[col].median() + outliers = (df_clean[col] > median * 100) | (df_clean[col] < median / 100) + outlier_count = outliers.sum() + + if outlier_count > 0: + log.warning(f"Detectados {outlier_count} outliers en {col}") + # Reemplazar outliers con el valor anterior válido + df_clean.loc[outliers, col] = np.nan + df_clean[col].fillna(method='ffill', inplace=True) + + # Verificar volumen negativo + if 'volume' in df_clean.columns: + negative_vol = df_clean['volume'] < 0 + if negative_vol.any(): + log.warning(f"Encontrados {negative_vol.sum()} volúmenes negativos") + df_clean.loc[negative_vol, 'volume'] = 0 + + log.success(f"Limpieza completa. Filas finales: {len(df_clean)}") + return df_clean + + @staticmethod + def resample_timeframe( + df: pd.DataFrame, + target_timeframe: str, + original_timeframe: Optional[str] = None + ) -> pd.DataFrame: + """ + Resamplea datos a un timeframe diferente + + Args: + df: DataFrame con datos OHLCV + target_timeframe: Timeframe destino ('5m', '15m', '1h', '4h', '1d') + original_timeframe: Timeframe original (opcional, para validación) + + Returns: + DataFrame resampled + """ + log.info(f"Resampling a {target_timeframe}") + + # Diccionario de conversión de timeframe a pandas offset + timeframe_map = { + '1m': '1T', + '5m': '5T', + '15m': '15T', + '30m': '30T', + '1h': '1H', + '4h': '4H', + '1d': '1D', + '1w': '1W' + } + + if target_timeframe not in timeframe_map: + raise ValueError(f"Timeframe {target_timeframe} no soportado") + + offset = timeframe_map[target_timeframe] + + # Resamplear usando reglas OHLC estándar + resampled = df.resample(offset).agg({ + 'open': 'first', + 'high': 'max', + 'low': 'min', + 'close': 'last', + 'volume': 'sum' + }) + + # Eliminar filas con NaN (periodos sin datos) + resampled.dropna(inplace=True) + + # Mantener metadata si existe + if 'symbol' in df.columns: + resampled['symbol'] = df['symbol'].iloc[0] + resampled['timeframe'] = target_timeframe + + log.success(f"Resampling completo: {len(df)} -> {len(resampled)} velas") + return resampled + + @staticmethod + def calculate_returns(df: pd.DataFrame, period: int = 1) -> pd.DataFrame: + """ + Calcula retornos simples y logarítmicos + + Args: + df: DataFrame con datos OHLCV + period: Periodo para calcular retornos + + Returns: + DataFrame con columnas de retornos añadidas + """ + df_returns = df.copy() + + # Retornos simples + df_returns['returns'] = df_returns['close'].pct_change(period) + + # Retornos logarítmicos (mejores para análisis estadístico) + df_returns['log_returns'] = np.log(df_returns['close'] / df_returns['close'].shift(period)) + + log.debug(f"Retornos calculados para periodo {period}") + return df_returns + + @staticmethod + def detect_gaps(df: pd.DataFrame, timeframe: str) -> pd.DataFrame: + """ + Detecta gaps en los datos (periodos faltantes) + + Args: + df: DataFrame con índice temporal + timeframe: Timeframe esperado + + Returns: + DataFrame con información de gaps + """ + timeframe_delta = { + '1m': pd.Timedelta(minutes=1), + '5m': pd.Timedelta(minutes=5), + '15m': pd.Timedelta(minutes=15), + '1h': pd.Timedelta(hours=1), + '4h': pd.Timedelta(hours=4), + '1d': pd.Timedelta(days=1) + } + + if timeframe not in timeframe_delta: + log.warning(f"Timeframe {timeframe} no reconocido para detección de gaps") + return pd.DataFrame() + + expected_delta = timeframe_delta[timeframe] + time_diffs = df.index.to_series().diff() + + # Gaps son diferencias mayores al timeframe esperado + gaps = time_diffs[time_diffs > expected_delta * 1.5] + + if len(gaps) > 0: + log.warning(f"Detectados {len(gaps)} gaps en los datos") + gap_info = pd.DataFrame({ + 'gap_start': gaps.index, + 'gap_duration': gaps.values + }) + return gap_info + + log.debug("No se detectaron gaps en los datos") + return pd.DataFrame() + + @staticmethod + def normalize_data(df: pd.DataFrame, method: str = 'minmax') -> pd.DataFrame: + """ + Normaliza datos numéricos + + Args: + df: DataFrame a normalizar + method: Método de normalización ('minmax', 'zscore') + + Returns: + DataFrame normalizado + """ + df_norm = df.copy() + numeric_cols = df_norm.select_dtypes(include=[np.number]).columns + + if method == 'minmax': + # Min-Max normalization [0, 1] + for col in numeric_cols: + min_val = df_norm[col].min() + max_val = df_norm[col].max() + if max_val > min_val: + df_norm[col] = (df_norm[col] - min_val) / (max_val - min_val) + + elif method == 'zscore': + # Z-score normalization (mean=0, std=1) + for col in numeric_cols: + mean = df_norm[col].mean() + std = df_norm[col].std() + if std > 0: + df_norm[col] = (df_norm[col] - mean) / std + + else: + raise ValueError(f"Método {method} no soportado") + + log.debug(f"Datos normalizados usando {method}") + return df_norm \ No newline at end of file diff --git a/src/data/storage.py b/src/data/storage.py new file mode 100644 index 0000000..565e466 --- /dev/null +++ b/src/data/storage.py @@ -0,0 +1,338 @@ +# src/data/storage.py +""" +Módulo para almacenamiento persistente de datos en PostgreSQL y caché en Redis +""" +import pandas as pd +from sqlalchemy import create_engine, Column, String, Float, DateTime, Integer, Index +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from datetime import datetime +from typing import Optional, List +import redis +import json +from ..monitoring.logger import log + +Base = declarative_base() + +class OHLCV(Base): + """ + Modelo de tabla para datos OHLCV + """ + __tablename__ = 'ohlcv' + + id = Column(Integer, primary_key=True, autoincrement=True) + timestamp = Column(DateTime, nullable=False) + symbol = Column(String(20), nullable=False) + timeframe = Column(String(10), nullable=False) + open = Column(Float, nullable=False) + high = Column(Float, nullable=False) + low = Column(Float, nullable=False) + close = Column(Float, nullable=False) + volume = Column(Float, nullable=False) + + # Índices compuestos para queries rápidas + __table_args__ = ( + Index('idx_symbol_timeframe_timestamp', 'symbol', 'timeframe', 'timestamp'), + Index('idx_timestamp', 'timestamp'), + ) + +class StorageManager: + """ + Gestor de almacenamiento con PostgreSQL y Redis + """ + + def __init__( + self, + db_host: str, + db_port: int, + db_name: str, + db_user: str, + db_password: str, + redis_host: str = 'localhost', + redis_port: int = 6379, + redis_db: int = 0 + ): + """ + Inicializa conexiones a PostgreSQL y Redis + """ + # PostgreSQL connection + db_url = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" + + try: + self.engine = create_engine( + db_url, + pool_size=10, + max_overflow=20, + echo=False + ) + + # Crear tablas si no existen + Base.metadata.create_all(self.engine) + + # Crear sesión + Session = sessionmaker(bind=self.engine) + self.session = Session() + + log.success("Conectado a PostgreSQL") + except Exception as e: + log.error(f"Error conectando a PostgreSQL: {e}") + raise + + # Redis connection (para caché) + try: + self.redis_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + decode_responses=True + ) + self.redis_client.ping() + log.success("Conectado a Redis") + except Exception as e: + log.warning(f"No se pudo conectar a Redis: {e}. Continuando sin caché.") + self.redis_client = None + + def save_ohlcv(self, df: pd.DataFrame, batch_size: int = 1000) -> int: + """ + Guarda datos OHLCV en la base de datos + + Args: + df: DataFrame con datos OHLCV + batch_size: Tamaño de lote para inserción + + Returns: + Número de registros guardados + """ + if df.empty: + log.warning("DataFrame vacío, nada que guardar") + return 0 + + log.info(f"Guardando {len(df)} registros en base de datos") + + try: + # Preparar datos para inserción + df_to_save = df.reset_index() + + # Renombrar columna de índice a timestamp si es necesario + if df_to_save.columns[0] != 'timestamp': + df_to_save.rename(columns={df_to_save.columns[0]: 'timestamp'}, inplace=True) + + # Insertar en lotes para mejor performance + records_saved = 0 + for i in range(0, len(df_to_save), batch_size): + batch = df_to_save.iloc[i:i+batch_size] + + # Usar to_sql con if_exists='append' y method='multi' + batch.to_sql( + 'ohlcv', + self.engine, + if_exists='append', + index=False, + method='multi' + ) + + records_saved += len(batch) + log.debug(f"Guardados {records_saved}/{len(df_to_save)} registros") + + log.success(f"Guardados {records_saved} registros exitosamente") + return records_saved + + except Exception as e: + log.error(f"Error guardando datos: {e}") + self.session.rollback() + raise + + def load_ohlcv( + self, + symbol: str, + timeframe: str, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + use_cache: bool = True + ) -> pd.DataFrame: + """ + Carga datos OHLCV de la base de datos + + Args: + symbol: Símbolo del par + timeframe: Timeframe + start_date: Fecha inicio (opcional) + end_date: Fecha fin (opcional) + use_cache: Si usar caché de Redis + + Returns: + DataFrame con datos OHLCV + """ + # Generar cache key + cache_key = f"ohlcv:{symbol}:{timeframe}:{start_date}:{end_date}" + + # Intentar obtener de caché + if use_cache and self.redis_client: + try: + cached_data = self.redis_client.get(cache_key) + if cached_data: + log.debug(f"Datos obtenidos de caché: {cache_key}") + df = pd.read_json(cached_data) + df.set_index('timestamp', inplace=True) + return df + except Exception as e: + log.warning(f"Error leyendo caché: {e}") + + # Construir query + query = f""" + SELECT timestamp, open, high, low, close, volume, symbol, timeframe + FROM ohlcv + WHERE symbol = '{symbol}' AND timeframe = '{timeframe}' + """ + + if start_date: + query += f" AND timestamp >= '{start_date}'" + if end_date: + query += f" AND timestamp <= '{end_date}'" + + query += " ORDER BY timestamp ASC" + + try: + df = pd.read_sql(query, self.engine) + + if df.empty: + log.warning(f"No se encontraron datos para {symbol} {timeframe}") + return pd.DataFrame() + + df['timestamp'] = pd.to_datetime(df['timestamp']) + df.set_index('timestamp', inplace=True) + + log.info(f"Cargados {len(df)} registros de {symbol} {timeframe}") + + # Guardar en caché + if self.redis_client: + try: + df_json = df.reset_index().to_json() + self.redis_client.setex(cache_key, 3600, df_json) # TTL 1 hora + log.debug(f"Datos guardados en caché: {cache_key}") + except Exception as e: + log.warning(f"Error guardando en caché: {e}") + + return df + + except Exception as e: + log.error(f"Error cargando datos: {e}") + raise + + def get_latest_timestamp(self, symbol: str, timeframe: str) -> Optional[datetime]: + """ + Obtiene el último timestamp disponible para un símbolo/timeframe + + Args: + symbol: Símbolo del par + timeframe: Timeframe + + Returns: + Último timestamp o None si no hay datos + """ + query = f""" + SELECT MAX(timestamp) as last_timestamp + FROM ohlcv + WHERE symbol = '{symbol}' AND timeframe = '{timeframe}' + """ + + try: + result = pd.read_sql(query, self.engine) + last_timestamp = result['last_timestamp'].iloc[0] + + if pd.isna(last_timestamp): + log.debug(f"No hay datos previos para {symbol} {timeframe}") + return None + + return last_timestamp + + except Exception as e: + log.error(f"Error obteniendo último timestamp: {e}") + return None + + def delete_ohlcv( + self, + symbol: str, + timeframe: str, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None + ) -> int: + """ + Elimina datos OHLCV + + Args: + symbol: Símbolo del par + timeframe: Timeframe + start_date: Fecha inicio (opcional) + end_date: Fecha fin (opcional) + + Returns: + Número de registros eliminados + """ + query = f""" + DELETE FROM ohlcv + WHERE symbol = '{symbol}' AND timeframe = '{timeframe}' + """ + + if start_date: + query += f" AND timestamp >= '{start_date}'" + if end_date: + query += f" AND timestamp <= '{end_date}'" + + try: + result = self.engine.execute(query) + deleted_count = result.rowcount + log.info(f"Eliminados {deleted_count} registros") + + # Invalidar caché + if self.redis_client: + cache_pattern = f"ohlcv:{symbol}:{timeframe}:*" + keys = self.redis_client.keys(cache_pattern) + if keys: + self.redis_client.delete(*keys) + + return deleted_count + + except Exception as e: + log.error(f"Error eliminando datos: {e}") + raise + + def get_available_data(self) -> pd.DataFrame: + """ + Obtiene resumen de datos disponibles en la base de datos + + Returns: + DataFrame con información de símbolos y timeframes disponibles + """ + query = """ + SELECT + symbol, + timeframe, + MIN(timestamp) as first_date, + MAX(timestamp) as last_date, + COUNT(*) as record_count + FROM ohlcv + GROUP BY symbol, timeframe + ORDER BY symbol, timeframe + """ + + try: + df = pd.read_sql(query, self.engine) + log.info(f"Información de {len(df)} conjuntos de datos") + return df + except Exception as e: + log.error(f"Error obteniendo información de datos: {e}") + raise + + def close(self): + """ + Cierra conexiones + """ + try: + self.session.close() + self.engine.dispose() + if self.redis_client: + self.redis_client.close() + log.info("Conexiones cerradas") + except Exception as e: + log.error(f"Error cerrando conexiones: {e}") \ No newline at end of file diff --git a/src/execution/broker_api.py b/src/execution/broker_api.py new file mode 100644 index 0000000..860ef34 --- /dev/null +++ b/src/execution/broker_api.py @@ -0,0 +1 @@ +#Start Coding diff --git a/src/execution/order_manager.py b/src/execution/order_manager.py new file mode 100644 index 0000000..9fdc8d6 --- /dev/null +++ b/src/execution/order_manager.py @@ -0,0 +1 @@ +#start Coding diff --git a/src/features/engineering.py b/src/features/engineering.py new file mode 100644 index 0000000..860ef34 --- /dev/null +++ b/src/features/engineering.py @@ -0,0 +1 @@ +#Start Coding diff --git a/src/features/indicators.py b/src/features/indicators.py new file mode 100644 index 0000000..9fdc8d6 --- /dev/null +++ b/src/features/indicators.py @@ -0,0 +1 @@ +#start Coding diff --git a/src/monitoring/alerts.py b/src/monitoring/alerts.py new file mode 100644 index 0000000..860ef34 --- /dev/null +++ b/src/monitoring/alerts.py @@ -0,0 +1 @@ +#Start Coding diff --git a/src/monitoring/logger.py b/src/monitoring/logger.py new file mode 100644 index 0000000..383a31f --- /dev/null +++ b/src/monitoring/logger.py @@ -0,0 +1,51 @@ +# src/monitoring/logger.py +""" +Sistema de logging centralizado para todo el bot +""" +from loguru import logger +import sys +from pathlib import Path + +def setup_logger(log_level: str = 'INFO'): + """ + Configura el sistema de logging con rotación y formato + """ + # Remover handler por defecto + logger.remove() + + # Console handler con colores + logger.add( + sys.stdout, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function} - {message}", + level=log_level, + colorize=True, + ) + + # File handler con rotación + log_path = Path("logs") + log_path.mkdir(exist_ok=True) + + logger.add( + log_path / "trading_bot_{time:YYYY-MM-DD}.log", + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", + level=log_level, + rotation="500 MB", + retention="30 days", + compression="zip" + ) + + # Error file separado + logger.add( + log_path / "errors_{time:YYYY-MM-DD}.log", + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", + level="ERROR", + rotation="100 MB", + retention="90 days", + compression="zip" + ) + + logger.info("Logger inicializado correctamente") + return logger + +# Instancia global +log = setup_logger() \ No newline at end of file diff --git a/src/risk/manager.py b/src/risk/manager.py new file mode 100644 index 0000000..860ef34 --- /dev/null +++ b/src/risk/manager.py @@ -0,0 +1 @@ +#Start Coding diff --git a/src/risk/position_sizing.py b/src/risk/position_sizing.py new file mode 100644 index 0000000..860ef34 --- /dev/null +++ b/src/risk/position_sizing.py @@ -0,0 +1 @@ +#Start Coding diff --git a/src/strategy/base.py b/src/strategy/base.py new file mode 100644 index 0000000..a096b6f --- /dev/null +++ b/src/strategy/base.py @@ -0,0 +1,2 @@ +#start Coding + diff --git a/src/strategy/ml_model.py b/src/strategy/ml_model.py new file mode 100644 index 0000000..fcfd449 --- /dev/null +++ b/src/strategy/ml_model.py @@ -0,0 +1 @@ +#start coding diff --git a/src/strategy/signals.py b/src/strategy/signals.py new file mode 100644 index 0000000..f411fa9 --- /dev/null +++ b/src/strategy/signals.py @@ -0,0 +1,2 @@ +#start coding + diff --git a/tests/test_data.py b/tests/test_data.py new file mode 100644 index 0000000..070ba84 --- /dev/null +++ b/tests/test_data.py @@ -0,0 +1,168 @@ +# tests/test_data.py +""" +Tests unitarios para el módulo de datos +""" +import pytest +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from src.data.processor import DataProcessor + +class TestDataProcessor: + """ + Tests para DataProcessor + """ + + @pytest.fixture + def sample_ohlcv_data(self): + """ + Genera datos OHLCV de ejemplo para tests + """ + dates = pd.date_range(start='2024-01-01', periods=100, freq='1H') + np.random.seed(42) + + base_price = 50000 + df = pd.DataFrame({ + 'open': base_price + np.random.randn(100) * 100, + 'high': base_price + np.random.randn(100) * 100 + 50, + 'low': base_price + np.random.randn(100) * 100 - 50, + 'close': base_price + np.random.randn(100) * 100, + 'volume': np.random.randint(1000, 10000, 100).astype(float), + }, index=dates) + + # Asegurar que high >= low + df['high'] = df[['open', 'close', 'high']].max(axis=1) + df['low'] = df[['open', 'close', 'low']].min(axis=1) + + df['symbol'] = 'BTC/USDT' + df['timeframe'] = '1h' + + return df + + def test_validate_ohlcv_valid(self, sample_ohlcv_data): + """ + Test validación de datos OHLCV correctos + """ + processor = DataProcessor() + assert processor.validate_ohlcv(sample_ohlcv_data) == True + + def test_validate_ohlcv_missing_columns(self, sample_ohlcv_data): + """ + Test validación con columnas faltantes + """ + processor = DataProcessor() + df_invalid = sample_ohlcv_data.drop(columns=['close']) + assert processor.validate_ohlcv(df_invalid) == False + + def test_clean_data_removes_duplicates(self, sample_ohlcv_data): + """ + Test limpieza elimina duplicados + """ + processor = DataProcessor() + + # Añadir duplicados + df_with_dupes = pd.concat([sample_ohlcv_data, sample_ohlcv_data.iloc[:5]]) + + df_clean = processor.clean_data(df_with_dupes) + + # Verificar que no hay duplicados en el índice + assert df_clean.index.duplicated().sum() == 0 + + def test_clean_data_handles_nan(self, sample_ohlcv_data): + """ + Test limpieza maneja valores NaN + """ + processor = DataProcessor() + + # Introducir NaN + df_with_nan = sample_ohlcv_data.copy() + df_with_nan.loc[df_with_nan.index[10:15], 'close'] = np.nan + + df_clean = processor.clean_data(df_with_nan) + + # Verificar que no quedan NaN + assert df_clean.isnull().sum().sum() == 0 + + def test_resample_timeframe(self, sample_ohlcv_data): + """ + Test resampleo a timeframe mayor + """ + processor = DataProcessor() + + # Resamplear de 1h a 4h + df_4h = processor.resample_timeframe(sample_ohlcv_data, '4h') + + # Verificar que hay menos velas + assert len(df_4h) < len(sample_ohlcv_data) + + # Verificar que el timeframe se actualizó + assert df_4h['timeframe'].iloc[0] == '4h' + + # Verificar lógica OHLC + assert (df_4h['high'] >= df_4h['low']).all() + assert (df_4h['high'] >= df_4h['open']).all() + assert (df_4h['high'] >= df_4h['close']).all() + + def test_calculate_returns(self, sample_ohlcv_data): + """ + Test cálculo de retornos + """ + processor = DataProcessor() + + df_returns = processor.calculate_returns(sample_ohlcv_data) + + # Verificar que se añadieron columnas de retornos + assert 'returns' in df_returns.columns + assert 'log_returns' in df_returns.columns + + # Verificar que el primer valor es NaN (no hay retorno previo) + assert pd.isna(df_returns['returns'].iloc[0]) + + def test_detect_gaps(self, sample_ohlcv_data): + """ + Test detección de gaps + """ + processor = DataProcessor() + + # Crear datos con gap artificial + df_with_gap = sample_ohlcv_data.iloc[:50].copy() + df_after_gap = sample_ohlcv_data.iloc[60:].copy() + df_with_gap = pd.concat([df_with_gap, df_after_gap]) + + gaps = processor.detect_gaps(df_with_gap, '1h') + + # Debería detectar al menos un gap + assert len(gaps) > 0 + + def test_normalize_minmax(self, sample_ohlcv_data): + """ + Test normalización min-max + """ + processor = DataProcessor() + + df_norm = processor.normalize_data(sample_ohlcv_data, method='minmax') + + # Verificar que valores están entre 0 y 1 + numeric_cols = df_norm.select_dtypes(include=[np.number]).columns + for col in numeric_cols: + assert df_norm[col].min() >= 0 + assert df_norm[col].max() <= 1 + + def test_normalize_zscore(self, sample_ohlcv_data): + """ + Test normalización z-score + """ + processor = DataProcessor() + + df_norm = processor.normalize_data(sample_ohlcv_data, method='zscore') + + # Verificar que la media es cercana a 0 y std cercana a 1 + numeric_cols = df_norm.select_dtypes(include=[np.number]).columns + for col in numeric_cols: + mean = df_norm[col].mean() + std = df_norm[col].std() + assert abs(mean) < 0.1 # Cercano a 0 + assert abs(std - 1) < 0.1 # Cercano a 1 + +# Para ejecutar tests: +# pytest tests/test_data.py -v \ No newline at end of file