Add walk-forward validation with optimizer, OOS evaluation and visualizer

This commit is contained in:
DaM
2026-01-28 23:40:12 +01:00
parent e15074c0a7
commit af7b862f60
11 changed files with 910 additions and 10 deletions

View File

@@ -273,6 +273,113 @@ Win Rate: 45%
Profit Factor: 1.1
```
## 🔁 Walk-Forward Validation (Out-of-Sample)
### 📌 ¿Qué es Walk-Forward Validation?
El *walk-forward validation* es una técnica avanzada de validación que simula cómo se comportaría una estrategia en condiciones reales:
- Los parámetros se **optimizan solo en datos pasados (TRAIN)**
- La estrategia se **ejecuta en datos futuros no vistos (TEST)**
- El proceso se repite de forma deslizante a lo largo del tiempo
Esto evita:
- Look-ahead bias
- Overfitting clásico
- Optimismo artificial en backtests
Es el estándar en *quant research* profesional.
---
### 🧠 Metodología aplicada en este proyecto
Para cada ventana temporal:
1. **TRAIN**
- Periodo fijo de entrenamiento
- Optimización por grid search
- Selección de parámetros según métrica objetivo (Sharpe Ratio)
2. **TEST (Out-of-Sample)**
- Backtest con los mejores parámetros del TRAIN
- Sin reoptimización
- Métricas registradas de forma independiente
3. **Desplazamiento**
- La ventana avanza en el tiempo
- Se repite el proceso hasta agotar los datos
---
### ⏱️ Configuración utilizada
- **Activo:** BTC/USDT
- **Timeframe:** 1h
- **Ventana TRAIN:** 365 días
- **Ventana TEST:** 90 días
- **Step:** 90 días
- **Capital inicial:** $10,000
- **Comisión:** 0.1%
- **Slippage:** 0.05%
**Estrategia base:**
- Moving Average Crossover
- Filtro de tendencia con ADX
---
### 📊 Resultados por ventana (TEST Out-of-Sample)
| Window | Return % | Sharpe | Max DD % | Trades | Parámetros |
|------|----------|--------|----------|--------|------------|
| 1 | +38.00 | 0.75 | -11.93 | 3 | MA(15/50) + ADX 25 |
| 2 | +3.62 | 0.10 | -23.67 | 2 | MA(15/50) + ADX 30 |
| 3 | +8.54 | 0.22 | -10.28 | 2 | MA(15/50) + ADX 30 |
| 4 | 0.00 | 0.00 | 0.00 | 0 | Sin trades |
| 5 | +9.71 | 0.26 | -10.57 | 2 | MA(15/50) + ADX 30 |
| 6 | 0.00 | 0.00 | 0.00 | 0 | Sin trades |
| 7 | -2.25 | -0.06 | -12.42 | 2 | MA(15/50) + ADX 30 |
| 8 | -2.27 | -0.13 | -5.46 | 2 | MA(15/30) + ADX 30 |
---
### 📈 Métricas agregadas (Out-of-Sample)
- **Ventanas evaluadas:** 8
- **Retorno medio:** +6.92%
- **Sharpe medio:** 0.14
- **Max Drawdown medio:** -9.29%
---
### 🧩 Interpretación de resultados
- La estrategia **no es sobreoptimizada**
- Existen ventanas sin operaciones → el sistema sabe **no operar**
- Las pérdidas están **controladas**
- No hay colapsos en mercados adversos
- El rendimiento depende claramente del régimen de mercado
Este comportamiento es consistente con una estrategia:
- Tendencial
- Conservadora
- Apta para mejoras vía *position sizing* y *portfolio diversification*
---
### ✅ Decisiones tomadas tras Walk-Forward
1. **NO modificar la lógica de entrada**
2. **NO optimizar más los parámetros base**
3. Mantener el filtro ADX como componente estructural
4. Avanzar hacia mejoras de:
- Position sizing
- Stops dinámicos
- Portfolio multi-asset
El walk-forward valida que la señal base es **estable y explotable**, aunque no espectacular por sí sola.
## 🔄 Próximos Pasos
Después del backtesting:

View File

@@ -6,7 +6,7 @@ from .engine import BacktestEngine
from .strategy import Strategy, Signal
from .trade import Trade, TradeType, TradeStatus, Position
from .optimizer import ParameterOptimizer
from .visualizer import BacktestVisualizer
from .visualizers.visualizer import BacktestVisualizer
from .metrics import (
calculate_sharpe_ratio,
calculate_sortino_ratio,

View File

@@ -219,7 +219,7 @@ class ParameterOptimizer:
return top_params
def save_results(self, filename: str = 'backtest_results/optimization_results.csv'):
def save_results(self, filename: str = 'backtest_results/optimizer/optimization_results.csv'):
"""
Guarda resultados en CSV

View File

View File

@@ -7,7 +7,7 @@ import matplotlib.dates as mdates
import pandas as pd
import numpy as np
from typing import Dict, Optional
from ..utils.logger import log
from ...utils.logger import log
# Configurar estilo
plt.style.use('seaborn-v0_8-darkgrid')
@@ -291,7 +291,7 @@ class BacktestVisualizer:
plt.close()
def generate_all_plots(self, output_dir: str = 'backtest_results'):
def generate_all_plots(self, output_dir: str = 'backtest_results/visualizer'):
"""
Genera todos los gráficos y los guarda
"""

View File

@@ -0,0 +1,325 @@
# src/backtest/visualizers/walk_forward_visualizer.py
from pathlib import Path
import ast
import json
import re
import pandas as pd
import matplotlib.pyplot as plt
from src.utils.logger import log
class WalkForwardVisualizer:
"""
Visualizador para resultados de Walk-Forward Validation.
Input esperado (desde CSVs):
- summary_df: walkforward_summary.csv
- windows_df: walkforward_windows.csv (incluye columna 'params')
"""
def __init__(
self,
summary_df: pd.DataFrame,
windows_df: pd.DataFrame,
name: str = "WalkForward",
output_dir: Path | str = "backtest_results/walkforward/plots",
):
self.summary = summary_df.copy()
self.windows = windows_df.copy()
self.name = name
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self._parse_params_column()
log.info(f"📊 WalkForwardVisualizer inicializado → {self.output_dir}")
# ------------------------------------------------------------------
# 🔧 PARSE ROBUSTO DE PARAMS
# ------------------------------------------------------------------
@staticmethod
def _coerce_params_string_to_dict(s: str):
"""
Convierte string de params (desde CSV) a dict.
Soporta:
- "{'fast_period': 15, ...}"
- "{'fast_period': np.int64(15), 'use_adx': np.True_, ...}"
- JSON con comillas dobles
"""
if s is None:
return None
if not isinstance(s, str):
return None
raw = s.strip()
if raw == "" or raw.lower() in {"none", "nan", "null"}:
return None
# Limpieza de serializaciones típicas numpy
cleaned = raw
# np.int64(15) -> 15
cleaned = re.sub(r"np\.int64\((\-?\d+)\)", r"\1", cleaned)
# np.float64(1.23) -> 1.23
cleaned = re.sub(r"np\.float64\(([-+0-9eE\.]+)\)", r"\1", cleaned)
# np.True_ / np.False_ -> True/False
cleaned = cleaned.replace("np.True_", "True").replace("np.False_", "False")
# np.bool_(True) -> True
cleaned = re.sub(r"np\.bool_\((True|False)\)", r"\1", cleaned)
# 1) intentar literal_eval (dict python)
try:
d = ast.literal_eval(cleaned)
if isinstance(d, dict):
return d
except Exception:
pass
# 2) intentar json (si viniera con comillas dobles)
try:
d = json.loads(cleaned)
if isinstance(d, dict):
return d
except Exception:
pass
# 3) intento extra: convertir comillas simples a dobles para json
try:
maybe_json = cleaned.replace("'", '"')
d = json.loads(maybe_json)
if isinstance(d, dict):
return d
except Exception:
return None
return None
def _parse_params_column(self):
"""
Convierte la columna 'params' a dict.
"""
if "params" not in self.windows.columns:
log.warning("⚠️ windows_df no tiene columna 'params'. Se omite parseo.")
return
self.windows["params"] = self.windows["params"].apply(
lambda x: x if isinstance(x, dict) else self._coerce_params_string_to_dict(x)
)
# ------------------------------------------------------------------
# 📊 PLOT 1: MÉTRICAS MEDIAS POR CONFIGURACIÓN
# ------------------------------------------------------------------
def plot_avg_metrics(self) -> Path:
df = self.summary.copy()
fig, ax = plt.subplots(figsize=(10, 6))
x = range(len(df))
ax.bar(x, df["avg_return_pct"], label="Avg Return (%)")
ax.bar(
x,
df["avg_sharpe"],
bottom=df["avg_return_pct"],
label="Avg Sharpe",
)
ax.set_xticks(x)
ax.set_xticklabels(df["wf_name"])
ax.set_title(f"Walk-Forward Performance Summary\n{self.name}")
ax.legend()
ax.grid(alpha=0.3)
path = self.output_dir / "wf_avg_metrics.png"
fig.tight_layout()
fig.savefig(path, dpi=150)
plt.close(fig)
log.success(f"💾 Plot guardado: {path}")
return path
# ------------------------------------------------------------------
# 📈 PLOT 2: RETURNS POR VENTANA (OOS)
# ------------------------------------------------------------------
def plot_returns_by_window(self) -> Path:
df = self.windows.copy()
if "wf_name" not in df.columns:
raise ValueError("windows_df debe contener la columna 'wf_name'")
df["test_start"] = pd.to_datetime(df["test_start"])
df = df.sort_values(["wf_name", "test_start"])
fig, ax = plt.subplots(figsize=(12, 6))
for wf_name, g in df.groupby("wf_name"):
ax.plot(
g["test_start"],
g["return_pct"],
marker="o",
label=wf_name,
)
ax.axhline(0, color="black", linestyle="--", linewidth=1)
ax.set_title(f"Walk-Forward OOS Returns by Window\n{self.name}")
ax.set_xlabel("Test period start")
ax.set_ylabel("Return (%)")
ax.legend()
ax.grid(alpha=0.3)
path = self.output_dir / "wf_returns_by_window.png"
fig.tight_layout()
fig.savefig(path, dpi=150)
plt.close(fig)
log.success(f"💾 Plot guardado: {path}")
return path
# ------------------------------------------------------------------
# 📉 PLOT 3: DRAWDOWN POR VENTANA (OOS)
# ------------------------------------------------------------------
def plot_drawdown_by_window(self) -> Path:
df = self.windows.copy()
required = {"wf_name", "test_start", "max_dd_pct"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"windows_df no contiene columnas requeridas: {missing}")
df["test_start"] = pd.to_datetime(df["test_start"])
df = df.sort_values(["wf_name", "test_start"])
fig, ax = plt.subplots(figsize=(12, 6))
for wf_name, g in df.groupby("wf_name"):
ax.plot(
g["test_start"],
g["max_dd_pct"],
marker="o",
label=wf_name,
)
ax.axhline(0, linestyle="--", linewidth=1)
ax.set_title(f"Walk-Forward OOS Max Drawdown by Window\n{self.name}")
ax.set_xlabel("Test period start")
ax.set_ylabel("Max Drawdown (%)")
ax.legend()
ax.grid(alpha=0.3)
path = self.output_dir / "wf_drawdown_by_window.png"
fig.tight_layout()
fig.savefig(path, dpi=150)
plt.close(fig)
log.success(f"💾 Plot guardado: {path}")
return path
# ------------------------------------------------------------------
# 📊 PLOT 4: DISTRIBUCIÓN DE RETURNS
# ------------------------------------------------------------------
def plot_return_distribution(self, bins: int = 20, overlay: bool = True) -> Path:
df = self.windows.copy().dropna(subset=["return_pct"])
path = self.output_dir / "wf_return_distribution.png"
if overlay:
fig, ax = plt.subplots(figsize=(12, 6))
for wf_name, g in df.groupby("wf_name"):
ax.hist(g["return_pct"], bins=bins, alpha=0.5, label=wf_name)
ax.axvline(0, linestyle="--", linewidth=1)
ax.set_title(f"Walk-Forward OOS Return Distribution\n{self.name}")
ax.set_xlabel("Return (%)")
ax.set_ylabel("Frequency")
ax.legend()
fig.tight_layout()
fig.savefig(path, dpi=150)
plt.close(fig)
log.success(f"💾 Plot guardado: {path}")
return path
wf_names = sorted(df["wf_name"].unique())
n = len(wf_names)
fig, axes = plt.subplots(n, 1, figsize=(12, 4 * n), sharex=True)
if n == 1:
axes = [axes]
for ax, wf_name in zip(axes, wf_names):
g = df[df["wf_name"] == wf_name]
ax.hist(g["return_pct"], bins=bins)
ax.axvline(0, linestyle="--", linewidth=1)
ax.set_title(wf_name)
axes[-1].set_xlabel("Return (%)")
fig.suptitle(f"Walk-Forward OOS Return Distribution\n{self.name}", y=0.98)
fig.tight_layout()
fig.savefig(path, dpi=150)
plt.close(fig)
log.success(f"💾 Plot guardado: {path}")
return path
# ------------------------------------------------------------------
# 📈 PLOT 5: ESTABILIDAD DE PARÁMETROS
# ------------------------------------------------------------------
def plot_parameter_stability(self, param_name: str) -> Path:
df = self.windows.copy()
if "params" not in df.columns:
raise ValueError("windows_df no tiene columna 'params'")
# Asegurar que params está parseado
df["params"] = df["params"].apply(
lambda x: x if isinstance(x, dict) else self._coerce_params_string_to_dict(x)
)
df[param_name] = df["params"].apply(
lambda p: p.get(param_name) if isinstance(p, dict) else None
)
df = df.dropna(subset=[param_name])
if df.empty:
# Debug útil para ver qué pasa en tu CSV
sample = self.windows["params"].dropna().head(5).tolist()
raise ValueError(
f"No se pudo extraer '{param_name}' desde params.\n"
f"Ejemplo de params (primeros 5 no-null): {sample}"
)
df["test_start"] = pd.to_datetime(df["test_start"])
df = df.sort_values(["wf_name", "test_start"])
fig, ax = plt.subplots(figsize=(12, 6))
for wf_name, g in df.groupby("wf_name"):
ax.plot(g["test_start"], g[param_name], marker="o", label=wf_name)
ax.set_title(f"WF Parameter Stability: {param_name}\n{self.name}")
ax.set_xlabel("Test period start")
ax.set_ylabel(param_name)
ax.legend()
ax.grid(alpha=0.3)
path = self.output_dir / f"wf_param_stability_{param_name}.png"
fig.tight_layout()
fig.savefig(path, dpi=150)
plt.close(fig)
log.success(f"💾 Plot guardado: {path}")
return path

View File

@@ -0,0 +1,255 @@
# src/backtest/walk_forward.py
import pandas as pd
from typing import List, Dict, Optional
from src.backtest.optimizer import ParameterOptimizer
from src.backtest.engine import BacktestEngine
from ..utils.logger import log
class WalkForwardValidator:
"""
Orquestador de walk-forward validation.
Su responsabilidad es:
- dividir el tiempo en ventanas TRAIN / TEST
- coordinar optimización y backtest (más adelante)
"""
def __init__(
self,
strategy_class,
param_grid: dict,
data: pd.DataFrame,
train_window: pd.Timedelta,
test_window: pd.Timedelta,
step_size: Optional[pd.Timedelta] = None,
initial_capital: float = 10_000,
commission: float = 0.001,
slippage: float = 0.0005,
position_size: float = 0.95,
optimizer_metric: str = "sharpe_ratio",
verbose: bool = True,
):
self.strategy_class = strategy_class
self.param_grid = param_grid
self.data = data.sort_index()
self.train_window = train_window
self.test_window = test_window
self.step_size = step_size or test_window
self.initial_capital = initial_capital
self.commission = commission
self.slippage = slippage
self.position_size = position_size
self.optimizer_metric = optimizer_metric
self.verbose = verbose
# Validaciones básicas
if not isinstance(self.data.index, pd.DatetimeIndex):
raise ValueError("data debe tener un DatetimeIndex")
if not self.data.index.is_monotonic_increasing:
raise ValueError("data.index debe estar ordenado cronológicamente")
# ------------------------------------------------------------------
# 🔹 GENERACIÓN DE VENTANAS TEMPORALES
# ------------------------------------------------------------------
def _generate_windows(self) -> List[Dict]:
"""
Genera ventanas temporales TRAIN / TEST para walk-forward validation.
Returns:
Lista de diccionarios con:
- train_start
- train_end
- test_start
- test_end
"""
windows: List[Dict] = []
data_start = self.data.index.min()
data_end = self.data.index.max()
train_start = data_start
window_id = 1
while True:
train_end = train_start + self.train_window
test_start = train_end
test_end = test_start + self.test_window
# 🚨 Condición de parada: no hay datos suficientes para TEST
if test_end > data_end:
if self.verbose:
log.info(
f"⛔ Walk-forward detenido: "
f"test_end ({test_end}) > data_end ({data_end})"
)
break
# Extraer subconjuntos (solo para validación de tamaño)
train_data = self.data.loc[train_start:train_end]
test_data = self.data.loc[test_start:test_end]
# Validaciones mínimas
if len(train_data) < 10:
log.warning(
f"⚠️ Ventana {window_id} ignorada: "
f"muy pocos datos en TRAIN ({len(train_data)})"
)
train_start += self.step_size
continue
if len(test_data) < 5:
log.warning(
f"⚠️ Ventana {window_id} ignorada: "
f"muy pocos datos en TEST ({len(test_data)})"
)
train_start += self.step_size
continue
window = {
"window_id": window_id,
"train_start": train_start,
"train_end": train_end,
"test_start": test_start,
"test_end": test_end,
}
windows.append(window)
if self.verbose:
log.info(
f"🪟 WF #{window_id} | "
f"TRAIN: {train_start.date()}{train_end.date()} | "
f"TEST: {test_start.date()}{test_end.date()}"
)
# Avanzar ventana
train_start += self.step_size
window_id += 1
if not windows:
raise RuntimeError("No se generó ninguna ventana válida de walk-forward")
return windows
# ------------------------------------------------------------------
# 🔹 EJECUCIÓN WF
# ------------------------------------------------------------------
def run(self):
"""
Ejecuta el walk-forward completo:
- Optimiza en TRAIN
- Evalúa en TEST
- Devuelve un objeto estructurado para análisis y visualización
"""
self.windows = self._generate_windows()
rows = [] # filas planas (para DataFrame)
raw_results = [] # resultados completos (debug / drill-down)
log.info(f"🚀 Iniciando Walk-Forward con {len(self.windows)} ventanas")
for w in self.windows:
wid = w["window_id"]
log.info(f"▶️ WF #{wid} en ejecución")
train_data = self.data.loc[w["train_start"]:w["train_end"]]
test_data = self.data.loc[w["test_start"]:w["test_end"]]
if train_data.empty or test_data.empty:
log.warning(f"WF #{wid} ignorado (datos insuficientes)")
continue
# 1⃣ Optimización TRAIN
optimizer = ParameterOptimizer(
strategy_class=self.strategy_class,
data=train_data,
initial_capital=self.initial_capital,
commission=self.commission,
slippage=self.slippage,
position_size=self.position_size,
)
opt_df = optimizer.optimize(self.param_grid)
if opt_df.empty:
log.warning(f"WF #{wid} sin resultados de optimización")
continue
best_params = optimizer.get_best_params(metric=self.optimizer_metric)
best_train_metric = opt_df[self.optimizer_metric].max()
# 2⃣ Backtest TEST (OOS)
strategy = self.strategy_class(**best_params)
engine = BacktestEngine(
strategy=strategy,
initial_capital=self.initial_capital,
commission=self.commission,
slippage=self.slippage,
position_size=self.position_size,
)
test_results = engine.run(test_data)
# 3⃣ Fila plana (TEST only)
rows.append({
"window": wid,
"train_start": w["train_start"],
"train_end": w["train_end"],
"test_start": w["test_start"],
"test_end": w["test_end"],
"return_pct": test_results["total_return_pct"],
"sharpe": test_results["sharpe_ratio"],
"max_dd_pct": test_results["max_drawdown_pct"],
"trades": test_results["total_trades"],
"params": best_params,
})
# 4⃣ Resultado completo (opcional)
raw_results.append({
"window_id": wid,
"best_params": best_params,
"train_metric": best_train_metric,
"test_results": test_results,
})
log.success(
f"✅ WF #{wid} | "
f"Return: {test_results['total_return_pct']:.2f}% | "
f"Sharpe: {test_results['sharpe_ratio']:.2f}"
)
# 5⃣ Resultado final
wf_result = {
"meta": {
"strategy": self.strategy_class.__name__,
"train_window": self.train_window,
"test_window": self.test_window,
"step_size": self.step_size,
"optimizer_metric": self.optimizer_metric,
"n_windows": len(rows),
"data_start": self.data.index.min(),
"data_end": self.data.index.max(),
},
"windows": pd.DataFrame(rows),
"raw_results": raw_results,
}
log.success("🏁 Walk-Forward completado")
# Validaciones de integridad
assert isinstance(wf_result, dict), "wf_result debe ser un dict"
assert "windows" in wf_result, "wf_result debe contener 'windows'"
assert "raw_results" in wf_result, "wf_result debe contener 'raw_results'"
assert isinstance(wf_result["windows"], pd.DataFrame), "'windows' debe ser DataFrame"
assert isinstance(wf_result["raw_results"], list), "'raw_results' debe ser list"
assert all(isinstance(r, dict) for r in wf_result["raw_results"]), "raw_results corrupto"
return wf_result

View File

@@ -1,16 +1,19 @@
# dam_test.py
"""
Script para probar el optimizador de parámetros
Script para probar cositas
"""
import os
import sys
from dotenv import load_dotenv
from pathlib import Path
import pandas as pd
# Añadir raíz del proyecto al path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.data.storage import StorageManager
from src.backtest.walk_forward import WalkForwardValidator
from src.strategies import MovingAverageCrossover
def setup_environment():
"""Carga variables de entorno"""
@@ -38,9 +41,19 @@ def dam_test():
use_cache=False
)
print(data.columns)
wf = WalkForwardValidator(
strategy_class=MovingAverageCrossover,
param_grid={},
data=data,
train_window=pd.Timedelta(days=365),
test_window=pd.Timedelta(days=90),
)
print(data[['close', 'adx']].tail(10))
windows = wf._generate_windows()
print(f"Ventanas generadas: {len(windows)}")
for w in windows[:3]:
print(w)
if __name__ == "__main__":
dam_test()

View File

@@ -15,7 +15,7 @@ from src.utils.logger import log
from src.data.storage import StorageManager
from src.strategies import MovingAverageCrossover
from src.backtest import BacktestEngine
from src.backtest.visualizer import BacktestVisualizer
from src.backtest.visualizers.visualizer import BacktestVisualizer
def setup_environment():
"""Carga variables de entorno"""
@@ -83,9 +83,9 @@ def test_visualizer():
viz = BacktestVisualizer(results, data)
# Generar todos los gráficos
viz.generate_all_plots('backtest_results')
viz.generate_all_plots()
log.info("\n💡 Los gráficos se guardaron en: backtest_results/")
log.info("\n💡 Los gráficos se guardaron en: backtest_results/visualizer")
log.info(" Archivos generados:")
log.info(" - equity_curve.png")
log.info(" - drawdown.png")

160
tests/test_walkforwad.py Normal file
View File

@@ -0,0 +1,160 @@
# tests/test_walkforward.py
"""
Script para probar Walk-Forward Validation
Guarda resultados en CSV para análisis posterior
"""
import os
import sys
from dotenv import load_dotenv
from pathlib import Path
import pandas as pd
# Añadir raíz del proyecto al path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.utils.logger import log
from src.data.storage import StorageManager
from src.strategies import MovingAverageCrossover
from src.backtest.walk_forward import WalkForwardValidator
def setup_environment():
"""Carga variables de entorno"""
env_path = Path(__file__).parent.parent / "config" / "secrets.env"
load_dotenv(dotenv_path=env_path)
log.success("✓ Variables de entorno cargadas")
def test_walkforward():
"""
Test de Walk-Forward Validation con múltiples configuraciones
"""
log.info("=" * 70)
log.info("🪜 TEST: WALK-FORWARD VALIDATION (MULTI CONFIG)")
log.info("=" * 70)
# --------------------------------------------------
# Setup
# --------------------------------------------------
setup_environment()
storage = StorageManager(
db_host=os.getenv("DB_HOST"),
db_port=int(os.getenv("DB_PORT", 5432)),
db_name=os.getenv("DB_NAME"),
db_user=os.getenv("DB_USER"),
db_password=os.getenv("DB_PASSWORD"),
)
log.info("\n📥 Cargando datos...")
data = storage.load_ohlcv(
symbol="BTC/USDT",
timeframe="1h",
start_date=None,
end_date=None,
use_cache=False,
)
log.success(f"✓ Datos cargados: {len(data)} velas")
# --------------------------------------------------
# Grid de parámetros de la estrategia
# --------------------------------------------------
param_grid = {
"fast_period": [10, 15],
"slow_period": [30, 50],
"ma_type": ["sma"],
"use_adx": [True],
"adx_threshold": [20, 25, 30],
}
# --------------------------------------------------
# Configuraciones Walk-Forward a comparar
# --------------------------------------------------
wf_configs = [
{"name": "WF_12_3", "train_days": 365, "test_days": 90},
{"name": "WF_24_3", "train_days": 365 * 2, "test_days": 90},
{"name": "WF_24_6", "train_days": 365 * 2, "test_days": 180},
]
all_windows = []
summary_rows = []
# --------------------------------------------------
# Ejecutar Walk-Forward por configuración
# --------------------------------------------------
for cfg in wf_configs:
log.info("\n" + "=" * 70)
log.info(f"🧪 EJECUTANDO {cfg['name']}")
log.info("=" * 70)
wf = WalkForwardValidator(
strategy_class=MovingAverageCrossover,
param_grid=param_grid,
data=data,
train_window=pd.Timedelta(days=cfg["train_days"]),
test_window=pd.Timedelta(days=cfg["test_days"]),
initial_capital=10_000,
commission=0.001,
slippage=0.0005,
position_size=0.95,
optimizer_metric="sharpe_ratio",
)
wf_result = wf.run()
# -------------------------------
# Validaciones básicas
# -------------------------------
assert isinstance(wf_result, dict), "wf_result debe ser dict"
assert "windows" in wf_result, "wf_result debe contener 'windows'"
df_windows = wf_result["windows"].copy()
df_windows["wf_name"] = cfg["name"]
all_windows.append(df_windows)
# -------------------------------
# Métricas agregadas por WF
# -------------------------------
summary_rows.append({
"wf_name": cfg["name"],
"train_days": cfg["train_days"],
"test_days": cfg["test_days"],
"windows": len(df_windows),
"avg_return_pct": df_windows["return_pct"].mean(),
"avg_sharpe": df_windows["sharpe"].mean(),
"avg_max_dd_pct": df_windows["max_dd_pct"].mean(),
"avg_trades": df_windows["trades"].mean(),
})
# --------------------------------------------------
# Consolidar resultados
# --------------------------------------------------
df_all_windows = pd.concat(all_windows, ignore_index=True)
df_summary = pd.DataFrame(summary_rows)
# --------------------------------------------------
# Guardar CSVs
# --------------------------------------------------
output_dir = Path("backtest_results/walkforward")
output_dir.mkdir(parents=True, exist_ok=True)
windows_path = output_dir / "walkforward_windows.csv"
summary_path = output_dir / "walkforward_summary.csv"
df_all_windows.to_csv(windows_path, index=False)
df_summary.to_csv(summary_path, index=False)
log.success(f"💾 CSV ventanas guardado: {windows_path}")
log.success(f"💾 CSV resumen guardado: {summary_path}")
print("\n📊 RESUMEN WALK-FORWARD:")
print(df_summary.to_string(index=False))
storage.close()
log.success("\n✅ TEST WALK-FORWARD COMPLETADO")
if __name__ == "__main__":
test_walkforward()

View File

@@ -0,0 +1,40 @@
# tests/test_wf_visualizer.py
import sys
from pathlib import Path
import pandas as pd
# Añadir raíz del proyecto al path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.backtest.visualizers.walk_forward_visualizer import WalkForwardVisualizer
def test_wf_visualizer():
"""
Test del WalkForwardVisualizer usando los CSV existentes
"""
base_path = Path("backtest_results/walkforward")
summary_df = pd.read_csv(base_path / "walkforward_summary.csv")
windows_df = pd.read_csv(base_path / "walkforward_windows.csv")
viz = WalkForwardVisualizer(
summary_df=summary_df,
windows_df=windows_df,
name="BTC/USDT MA + ADX"
)
# 📊 Plots
viz.plot_avg_metrics()
viz.plot_returns_by_window()
viz.plot_drawdown_by_window()
viz.plot_return_distribution()
viz.plot_parameter_stability("fast_period")
viz.plot_parameter_stability("slow_period")
viz.plot_parameter_stability("adx_threshold")
if __name__ == "__main__":
test_wf_visualizer()