feat: finalize portfolio system and quantitative validation- Finalized MA_Crossover(30,100) and TrendFiltered_MA(30,100,ADX=15)

- Implemented portfolio engine with risk-based allocation (50/50)
- Added equity-based metrics for system-level evaluation
- Validated portfolio against standalone strategies
- Reduced max drawdown and volatility at system level
- Quantitative decision closed before paper trading phase
This commit is contained in:
DaM
2026-02-02 14:38:05 +01:00
parent c569170fcc
commit f85c522f22
53 changed files with 2389 additions and 104 deletions

View File

@@ -2,7 +2,7 @@
"""
Módulo de backtesting
"""
from .engine import BacktestEngine
from .engine import Engine
from .strategy import Strategy, Signal
from .trade import Trade, TradeType, TradeStatus, Position
from .optimizer import ParameterOptimizer
@@ -16,7 +16,7 @@ from .metrics import (
)
__all__ = [
'BacktestEngine',
'Engine',
'Strategy',
'Signal',
'Trade',

View File

@@ -12,7 +12,7 @@ from .trade import Trade, TradeType, TradeStatus, Position
from ..risk.sizing.base import PositionSizer
from ..risk.stops.base import StopLoss
class BacktestEngine:
class Engine:
"""
Motor de backtesting que simula la ejecución de una estrategia
"""
@@ -185,14 +185,30 @@ class BacktestEngine:
Abre una nueva posición, delegando size al PositionSizer si existe
"""
current_bar = self.data.iloc[idx]
current_price = current_bar['close']
current_price = current_bar["close"]
current_time = current_bar.name
# Aplicar slippage (en compra, pagamos más)
# Aplicar slippage (en compra pagamos más)
execution_price = current_price * (1 + self.slippage)
# --------------------------------------------------
# 1) Calcular units (size) vía sizer o fallback legacy
# 🔴 1) Calcular STOP antes del size (si existe)
# --------------------------------------------------
stop_price = None
if self.stop_loss is not None:
try:
stop_price = self.stop_loss.get_stop_price(
data=self.data,
idx=idx,
entry_price=execution_price,
trade_type=trade_type,
)
except Exception as e:
log.warning(f"[{current_time}] Error calculando stop: {e}")
return
# --------------------------------------------------
# ✅ 2) Calcular units (size)
# --------------------------------------------------
if self.position_sizer is not None:
try:
@@ -200,79 +216,87 @@ class BacktestEngine:
self.position_sizer.calculate_size(
capital=self.cash,
entry_price=float(execution_price),
stop_price=None, # stops aún no integrados
volatility=None # vol/ATR aún no integrado aquí
stop_price=stop_price,
max_capital=self.cash,
volatility=None,
)
)
except Exception as e:
log.warning(f"[{current_time}] PositionSizer rechazó la entrada: {e}")
return
if not np.isfinite(units) or units <= 0:
log.warning(f"[{current_time}] PositionSizer devolvió units inválidos: {units}")
log.warning(f"[{current_time}] Units inválidas: {units}")
return
position_value = units * execution_price
else:
# Fallback actual: usar fracción del cash
position_value = self.cash * self.position_size_fraction
units = position_value / execution_price
# Fallback legacy
units = (self.cash * self.position_size_fraction) / execution_price
# --------------------------------------------------
# ✅ 2) Comisión basada en el nominal invertido
# ✅ 3) CLIP SIZE si no hay suficiente cash
# --------------------------------------------------
position_value = units * execution_price
commission_cost = position_value * self.commission
total_cost = position_value + commission_cost
# Verificar que tenemos suficiente cash
if self.cash < position_value + commission_cost:
log.warning(
f"[{current_time}] Cash insuficiente para abrir posición "
f"(cash=${self.cash:.2f}, needed=${position_value + commission_cost:.2f})"
if total_cost > self.cash:
max_affordable_units = self.cash / (
execution_price * (1 + self.commission)
)
return
if max_affordable_units <= 0:
log.warning(
f"[{current_time}] Cash insuficiente incluso para size mínimo"
)
return
units = max_affordable_units
position_value = units * execution_price
commission_cost = position_value * self.commission
# --------------------------------------------------
# ✅ 3) Crear trade + posición
# ✅ 4) Crear trade
# --------------------------------------------------
trade = Trade(
symbol=current_bar.get('symbol', 'UNKNOWN'),
symbol=current_bar.get("symbol", "UNKNOWN"),
trade_type=trade_type,
entry_price=execution_price,
entry_time=current_time,
size=units,
entry_commission=commission_cost,
entry_reason="Strategy signal"
entry_reason="Strategy signal",
stop_price_at_entry=stop_price,
capital_at_entry=self.cash,
)
# Actualizar cash
self.cash -= (position_value + commission_cost)
# Crear posición
# --------------------------------------------------
# ✅ 5) Crear posición
# --------------------------------------------------
self.current_position = Position(
symbol=trade.symbol,
trade_type=trade_type,
average_price=execution_price,
total_size=units,
trades=[trade]
trades=[trade],
)
# 🔴 FIJAR STOP INICIAL
if self.stop_loss is not None:
stop_price = self.stop_loss.get_stop_price(
data=self.data,
idx=idx,
entry_price=execution_price,
trade_type=trade_type,
)
# Fijar stop inicial (si existe)
if stop_price is not None:
self.current_position.set_stop(stop_price)
self.trades.append(trade)
log.debug(f"[{current_time}] OPEN {trade_type.value}: "
f"Price: ${execution_price:.2f}, Units: {units:.6f}, "
f"Value: ${position_value:.2f}, Fee: ${commission_cost:.2f}")
log.debug(
f"[{current_time}] OPEN {trade_type.value}: "
f"Price=${execution_price:.2f} | Units={units:.6f} | "
f"Value=${position_value:.2f} | Fee=${commission_cost:.2f}"
)
def _close_position(self, idx: int, reason: str):
"""
Cierra la posición actual

View File

@@ -1,4 +1,4 @@
# src/backtest/metrics.py
# src/core/metrics.py
"""
Métricas avanzadas de performance para backtesting
"""

View File

@@ -7,7 +7,7 @@ import pandas as pd
from typing import Dict, List, Any, Type
from itertools import product
from ..utils.logger import log
from .engine import BacktestEngine
from .engine import Engine
from .strategy import Strategy
class ParameterOptimizer:
@@ -82,7 +82,7 @@ class ParameterOptimizer:
strategy = self.strategy_class(**params)
# Ejecutar backtest
engine = BacktestEngine(
engine = Engine(
strategy=strategy,
initial_capital=self.initial_capital,
commission=self.commission,

View File

@@ -30,6 +30,8 @@ class Trade:
exit_price: Optional[float] = None
exit_time: Optional[datetime] = None
status: TradeStatus = field(default=TradeStatus.OPEN)
stop_price_at_entry: Optional[float] = None
capital_at_entry: Optional[float] = None
# Costes
entry_commission: float = 0.0

View File

@@ -1,8 +1,8 @@
# src/backtest/walk_forward.py
import pandas as pd
from typing import List, Dict, Optional
from src.backtest.optimizer import ParameterOptimizer
from src.backtest.engine import BacktestEngine
from src.core.optimizer import ParameterOptimizer
from src.core.engine import Engine
from ..utils.logger import log
class WalkForwardValidator:
@@ -188,7 +188,7 @@ class WalkForwardValidator:
# 2⃣ Backtest TEST (OOS)
strategy = self.strategy_class(**best_params)
engine = BacktestEngine(
engine = Engine(
strategy=strategy,
initial_capital=self.initial_capital,
commission=self.commission,

0
src/metrics/__init__.py Normal file
View File

View File

@@ -0,0 +1,131 @@
# src/metrics/equity_metrics.py
"""
Métricas basadas exclusivamente en equity curve.
Pensadas para portfolio, paper trading y UI.
"""
import numpy as np
import pandas as pd
from typing import List, Optional, Dict
# --------------------------------------------------
# Helpers
# --------------------------------------------------
def _to_series(
equity_curve: List[float],
timestamps: Optional[List[pd.Timestamp]] = None,
) -> pd.Series:
if timestamps is None:
return pd.Series(equity_curve)
return pd.Series(equity_curve, index=pd.to_datetime(timestamps))
# --------------------------------------------------
# Core metrics
# --------------------------------------------------
def calculate_cagr(
equity_curve: List[float],
timestamps: List[pd.Timestamp],
) -> float:
"""
CAGR real usando timestamps.
"""
equity = _to_series(equity_curve, timestamps)
if len(equity) < 2:
return 0.0
start, end = equity.iloc[0], equity.iloc[-1]
years = (equity.index[-1] - equity.index[0]).days / 365.25
if years <= 0:
return 0.0
return (end / start) ** (1 / years) - 1
def calculate_drawdown_series(equity: pd.Series) -> pd.Series:
running_max = equity.cummax()
return (equity - running_max) / running_max
def calculate_max_drawdown(
equity_curve: List[float],
timestamps: Optional[List[pd.Timestamp]] = None,
) -> float:
equity = _to_series(equity_curve, timestamps)
dd = calculate_drawdown_series(equity)
return dd.min()
def calculate_time_in_drawdown(
equity_curve: List[float],
timestamps: List[pd.Timestamp],
) -> float:
"""
% del tiempo que el sistema está bajo su máximo histórico.
"""
equity = _to_series(equity_curve, timestamps)
dd = calculate_drawdown_series(equity)
return (dd < 0).mean()
def calculate_ulcer_index(
equity_curve: List[float],
timestamps: Optional[List[pd.Timestamp]] = None,
) -> float:
"""
Ulcer Index: profundidad + duración del drawdown.
"""
equity = _to_series(equity_curve, timestamps)
dd = calculate_drawdown_series(equity)
return np.sqrt(np.mean(np.square(dd * 100)))
def calculate_equity_volatility(
equity_curve: List[float],
timestamps: Optional[List[pd.Timestamp]] = None,
annualize: bool = True,
) -> float:
equity = _to_series(equity_curve, timestamps)
returns = equity.pct_change().dropna()
if returns.empty:
return 0.0
vol = returns.std()
return vol * np.sqrt(252) if annualize else vol
def calculate_calmar_ratio(
equity_curve: List[float],
timestamps: List[pd.Timestamp],
) -> float:
cagr = calculate_cagr(equity_curve, timestamps)
max_dd = abs(calculate_max_drawdown(equity_curve, timestamps))
if max_dd == 0:
return 0.0
return cagr / max_dd
# --------------------------------------------------
# Aggregator
# --------------------------------------------------
def compute_equity_metrics(
equity_curve: List[float],
timestamps: List[pd.Timestamp],
) -> Dict[str, float]:
"""
Métricas clave para comparar sistemas y portfolios.
"""
return {
"cagr": calculate_cagr(equity_curve, timestamps),
"max_drawdown": calculate_max_drawdown(equity_curve, timestamps),
"calmar_ratio": calculate_calmar_ratio(equity_curve, timestamps),
"volatility": calculate_equity_volatility(equity_curve, timestamps),
"time_in_drawdown": calculate_time_in_drawdown(equity_curve, timestamps),
"ulcer_index": calculate_ulcer_index(equity_curve, timestamps),
}

View File

View File

@@ -0,0 +1,17 @@
# src/portfolio/allocation.py
from dataclasses import dataclass
from typing import Dict
@dataclass
class Allocation:
"""
Define cómo se reparte el riesgo entre estrategias.
Los pesos deben sumar 1.0
"""
weights: Dict[str, float]
def validate(self):
total = sum(self.weights.values())
if abs(total - 1.0) > 1e-6:
raise ValueError(f"Allocation weights must sum to 1. Got {total}")

View File

@@ -0,0 +1,67 @@
# src/portfolio/portfolio_engine.py
from typing import Dict
import pandas as pd
from src.core.engine import Engine
from src.portfolio.allocation import Allocation
from src.portfolio.portfolio_result import PortfolioResult
class PortfolioEngine:
"""
Ejecuta múltiples engines en paralelo y combina resultados
alineando las curvas por timestamp.
"""
def __init__(
self,
engines: Dict[str, Engine],
allocation: Allocation,
initial_capital: float,
):
allocation.validate()
self.engines = engines
self.allocation = allocation
self.initial_capital = initial_capital
def run(self, data):
results = {}
equity_series = []
for name, engine in self.engines.items():
res = engine.run(data)
results[name] = res
weight = self.allocation.weights[name]
# --- construir serie con timestamps ---
eq = pd.Series(
res["equity_curve"],
index=pd.to_datetime(res["timestamps"]),
name=name,
)
# --- aplicar peso ---
eq_weighted = eq * weight
equity_series.append(eq_weighted)
# --------------------------------------------------
# Alinear todas las curvas por timestamp
# --------------------------------------------------
df = pd.concat(equity_series, axis=1)
# Forward fill para periodos sin trades
df = df.ffill()
# Si alguna empieza más tarde, asumimos capital inicial ponderado
df = df.fillna(self.initial_capital * 0.0)
# Equity total del portfolio
portfolio_equity = df.sum(axis=1)
return PortfolioResult(
equity_curve=portfolio_equity.tolist(),
final_capital=float(portfolio_equity.iloc[-1]),
components=results,
)

View File

@@ -0,0 +1,10 @@
# src/portfolio/portfolio_result.py
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class PortfolioResult:
equity_curve: List[float]
final_capital: float
components: Dict[str, dict]

View File

@@ -12,9 +12,11 @@ class PositionSizer(ABC):
@abstractmethod
def calculate_size(
self,
*,
capital: float,
entry_price: float,
stop_price: Optional[float] = None,
max_capital: float | None = None,
volatility: Optional[float] = None,
) -> float:
"""

View File

@@ -1,11 +1,11 @@
# src/risk/sizing/percent_risk.py
from .base import PositionSizer
class PercentRiskSizer(PositionSizer):
"""
Position sizing basado en % de riesgo por trade.
Position sizing basado en % de riesgo por trade,
limitado por capital disponible.
"""
def __init__(self, risk_fraction: float):
@@ -15,22 +15,30 @@ class PercentRiskSizer(PositionSizer):
def calculate_size(
self,
*,
capital: float,
entry_price: float,
stop_price: float | None = None
stop_price: float | None = None,
max_capital: float | None = None,
volatility=None,
) -> float:
if stop_price is None:
raise ValueError("PercentRiskSizer requiere stop_price")
risk_amount = capital * self.risk_fraction
distance = abs(entry_price - stop_price)
if distance < 0:
raise ValueError("Distancia entry-stop inválida")
if distance == 0:
if distance <= 0:
return 0.0
position_size = risk_amount / distance
return position_size
# 1⃣ Riesgo máximo permitido
risk_amount = capital * self.risk_fraction
units_by_risk = risk_amount / distance
# 2⃣ Límite por capital disponible
if max_capital is not None:
max_units_by_cash = max_capital / entry_price
units = min(units_by_risk, max_units_by_cash)
else:
units = units_by_risk
return max(units, 0.0)

View File

@@ -2,7 +2,7 @@
import pandas as pd
import numpy as np
from src.risk.stops.base import StopLoss
from src.backtest.trade import TradeType
from src.core.trade import TradeType
class ATRStop(StopLoss):
@@ -36,7 +36,7 @@ class ATRStop(StopLoss):
axis=1,
).max(axis=1)
atr = tr.rolling(self.atr_period).mean()
atr = tr.ewm(alpha=1/self.atr_period, adjust=False).mean()
return atr
def get_stop_price(

View File

@@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
import pandas as pd
from src.backtest.trade import TradeType
from src.core.trade import TradeType
class StopLoss(ABC):
@@ -37,3 +37,16 @@ class StopLoss(ABC):
stop_price (float)
"""
pass
def update_stop(
self,
*,
data: pd.DataFrame,
idx: int,
position
):
"""
Por defecto: stop NO dinámico.
Devuelve None → no mover stop.
"""
return None

View File

@@ -1,7 +1,7 @@
# src/risk/stops/fixed_stop.py
import pandas as pd
from src.risk.stops.base import StopLoss
from src.backtest.trade import TradeType
from src.core.trade import TradeType
class FixedStop(StopLoss):
"""

View File

@@ -1,7 +1,7 @@
# src/risk/stops/trailing_stop.py
import pandas as pd
from src.risk.stops.base import StopLoss
from src.backtest.trade import TradeType, Position
from src.core.trade import TradeType, Position
class TrailingStop(StopLoss):

View File

@@ -1,5 +1,45 @@
# src/strategies/base.py
"""
Estrategias base para herencia compleja
TODO: Implementar en fases futuras
"""
from abc import ABC, abstractmethod
import pandas as pd
from src.core.strategy import Signal
class Strategy(ABC):
"""
Clase base para todas las estrategias.
Flujo:
- Engine llama a set_data(data)
- set_data → init_indicators
- Engine llama a generate_signal(idx)
"""
def __init__(self, name: str, params: dict | None = None):
self.name = name
self.params = params or {}
self.data: pd.DataFrame | None = None
def set_data(self, data: pd.DataFrame):
"""
Inyecta el DataFrame y calcula indicadores.
"""
self.data = self.init_indicators(data.copy())
@abstractmethod
def init_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
"""
Calcula y añade indicadores al DataFrame.
"""
pass
@abstractmethod
def generate_signal(self, idx: int) -> Signal:
"""
Devuelve BUY / SELL / HOLD para el índice idx.
"""
pass
def __repr__(self):
params = ", ".join(f"{k}={v}" for k, v in self.params.items())
return f"{self.name}({params})"

View File

@@ -0,0 +1,64 @@
# src/strategies/breakout.py
import pandas as pd
from src.strategies.base import Strategy
from src.core.strategy import Signal
class DonchianBreakout(Strategy):
"""
Estrategia de ruptura de canales Donchian
Señales:
- BUY: El precio rompe el máximo de los últimos N periodos
- SELL: El precio rompe el mínimo de los últimos N periodos
- HOLD: En cualquier otro caso
Parámetros:
lookback: Ventana de cálculo del canal
Valores por defecto:
lookback = 20
≈ 1 día en timeframe 1h
Parámetro clásico del sistema Turtle
Notas:
- Es una estrategia de momentum puro
- No intenta comprar barato, compra fortaleza
- Filtra ruido al exigir ruptura real
"""
def __init__(self, lookback: int = 20):
params = {
"lookback": lookback
}
super().__init__(name="DonchianBreakout", params=params)
self.lookback = lookback
# ------------------------------------------------------------------
def init_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
data["donchian_high"] = data["high"].rolling(self.lookback).max()
data["donchian_low"] = data["low"].rolling(self.lookback).min()
return data
def generate_signal(self, idx: int) -> Signal:
if idx < self.lookback:
return Signal.HOLD
high = self.data["high"]
low = self.data["low"]
close = self.data["close"]
max_high = high.iloc[idx - self.lookback : idx].max()
min_low = low.iloc[idx - self.lookback : idx].min()
price = close.iloc[idx]
if price > max_high:
return Signal.BUY
elif price < min_low:
return Signal.SELL
return Signal.HOLD

View File

@@ -3,7 +3,7 @@
Estrategia Buy and Hold
"""
import pandas as pd
from ..backtest.strategy import Strategy, Signal
from ..core.strategy import Strategy, Signal
class BuyAndHold(Strategy):
"""

View File

@@ -0,0 +1,95 @@
# src/strategies/mean_reversion.py
import pandas as pd
import numpy as np
from src.strategies.base import Strategy
from src.core.strategy import Signal
class RSIMeanReversion(Strategy):
"""
Estrategia de reversión a la media basada en RSI.
Idea:
- Compra cuando el mercado está sobrevendido
- Vende cuando el precio rebota hacia la media
Señales:
- BUY: RSI cruza por debajo de oversold
- SELL: RSI cruza por encima de overbought
- HOLD: en cualquier otro caso
Parámetros:
period: periodo del RSI
oversold: nivel de sobreventa
overbought: nivel de sobrecompra
Valores típicos:
period = 14
oversold = 30
overbought = 70
"""
def __init__(
self,
period: int = 14,
oversold: float = 30.0,
overbought: float = 70.0,
):
super().__init__(
name="RSI_MeanReversion",
params={
"period": period,
"oversold": oversold,
"overbought": overbought,
},
)
self.period = period
self.oversold = oversold
self.overbought = overbought
# --------------------------------------------------
def init_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
"""
Calcula el RSI clásico (Wilder).
Añade:
- data["rsi"]
"""
delta = data["close"].diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(alpha=1 / self.period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1 / self.period, adjust=False).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
data["rsi"] = rsi
return data
# --------------------------------------------------
def generate_signal(self, idx: int) -> Signal:
"""
Genera señales de trading basadas en cruces del RSI.
"""
if idx == 0:
return Signal.HOLD
rsi_prev = self.data["rsi"].iloc[idx - 1]
rsi_curr = self.data["rsi"].iloc[idx]
# BUY → cruce hacia abajo de oversold
if rsi_prev > self.oversold and rsi_curr <= self.oversold:
return Signal.BUY
# SELL → cruce hacia arriba de overbought
if rsi_prev < self.overbought and rsi_curr >= self.overbought:
return Signal.SELL
return Signal.HOLD

View File

@@ -3,7 +3,7 @@
Estrategia de cruce de medias móviles con filtro ADX opcional
"""
import pandas as pd
from ..backtest.strategy import Strategy, Signal, calculate_sma, calculate_ema
from ..core.strategy import Strategy, Signal, calculate_sma, calculate_ema
class MovingAverageCrossover(Strategy):
@@ -21,13 +21,18 @@ class MovingAverageCrossover(Strategy):
ma_type: 'sma' o 'ema'
use_adx: Activar filtro ADX
adx_threshold: Umbral mínimo de ADX
Valores por defecto:
20/50 EMA → clásico en crypto 1h - 4h
EMA reacciona mejor que SMA
Sin ADX todavía → primero evaluamos la señal “pura”
"""
def __init__(
self,
fast_period: int = 10,
slow_period: int = 30,
ma_type: str = 'sma',
fast_period: int = 20,
slow_period: int = 50,
ma_type: str = 'ema',
use_adx: bool = False,
adx_threshold: float = 20.0
):

View File

View File

View File

@@ -0,0 +1,25 @@
from itertools import product
from src.strategies.moving_average import MovingAverageCrossover
class MACrossoverOptimization:
name = "MA_Crossover"
@staticmethod
def parameter_grid():
fast = [10, 15, 20, 25, 30]
slow = [40, 50, 60, 80, 100]
min_gap = 15
for f, s in product(fast, slow):
if s - f >= min_gap:
yield {
"fast_period": f,
"slow_period": s,
"ma_type": "ema",
"use_adx": False,
}
@staticmethod
def build_strategy(params):
return MovingAverageCrossover(**params)

View File

@@ -0,0 +1,26 @@
from itertools import product
from src.strategies.trend_filtered import TrendFilteredMACrossover
class TrendFilteredMAOptimization:
name = "TrendFiltered_MA"
@staticmethod
def parameter_grid():
fast = [10, 15, 20, 25, 30]
slow = [40, 50, 60, 80, 100]
adx = [15, 20, 25, 30]
min_gap = 15
for f, s, a in product(fast, slow, adx):
if s - f >= min_gap:
yield {
"fast_period": f,
"slow_period": s,
"ma_type": "ema",
"adx_threshold": a,
}
@staticmethod
def build_strategy(params):
return TrendFilteredMACrossover(**params)

View File

@@ -3,7 +3,7 @@
Estrategia basada en RSI
"""
import pandas as pd
from ..backtest.strategy import Strategy, Signal, calculate_rsi
from ..core.strategy import Strategy, Signal, calculate_rsi
class RSIStrategy(Strategy):
"""

View File

@@ -0,0 +1,131 @@
# src/strategies/trend_filtered.py
import pandas as pd
import numpy as np
from src.strategies.base import Strategy
from src.core.strategy import Signal
class TrendFilteredMACrossover(Strategy):
"""
Estrategia de cruce de medias con filtro de tendencia.
Señales:
- BUY:
* Cruce alcista de medias
* Precio por encima de MA lenta
* ADX >= threshold
- SELL:
* Cruce bajista de medias
- HOLD:
* En cualquier otro caso
Objetivo:
- Evitar whipsaws en mercado lateral
- Operar solo cuando hay estructura de tendencia
Parámetros por defecto:
fast_period=20
slow_period=50
ma_type='ema'
adx_period=14
adx_threshold=20
"""
def __init__(
self,
fast_period: int = 20,
slow_period: int = 50,
ma_type: str = "ema",
adx_period: int = 14,
adx_threshold: float = 20.0,
):
params = {
"fast_period": fast_period,
"slow_period": slow_period,
"ma_type": ma_type,
"adx_period": adx_period,
"adx_threshold": adx_threshold,
}
super().__init__(name="TrendFilteredMACrossover", params=params)
self.fast_period = fast_period
self.slow_period = slow_period
self.ma_type = ma_type
self.adx_period = adx_period
self.adx_threshold = adx_threshold
# --------------------------------------------------
def init_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
# Medias móviles
if self.ma_type == "ema":
data["ma_fast"] = data["close"].ewm(
span=self.fast_period, adjust=False
).mean()
data["ma_slow"] = data["close"].ewm(
span=self.slow_period, adjust=False
).mean()
else:
data["ma_fast"] = data["close"].rolling(self.fast_period).mean()
data["ma_slow"] = data["close"].rolling(self.slow_period).mean()
# ADX
high = data["high"]
low = data["low"]
close = data["close"]
plus_dm = high.diff()
minus_dm = low.diff().abs()
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm < 0] = 0
tr = pd.concat(
[
high - low,
(high - close.shift()).abs(),
(low - close.shift()).abs(),
],
axis=1,
).max(axis=1)
atr = tr.ewm(alpha=1 / self.adx_period, adjust=False).mean()
plus_di = 100 * (
plus_dm.ewm(alpha=1 / self.adx_period, adjust=False).mean() / atr
)
minus_di = 100 * (
minus_dm.ewm(alpha=1 / self.adx_period, adjust=False).mean() / atr
)
dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100
data["adx"] = dx.ewm(alpha=1 / self.adx_period, adjust=False).mean()
return data
# --------------------------------------------------
def generate_signal(self, idx: int) -> Signal:
if idx == 0:
return Signal.HOLD
row = self.data.iloc[idx]
prev = self.data.iloc[idx - 1]
# Cruces
cross_up = prev.ma_fast <= prev.ma_slow and row.ma_fast > row.ma_slow
cross_down = prev.ma_fast >= prev.ma_slow and row.ma_fast < row.ma_slow
# Filtro de tendencia
trend_ok = (
row.close > row.ma_slow and row.adx >= self.adx_threshold
)
if cross_up and trend_ok:
return Signal.BUY
if cross_down:
return Signal.SELL
return Signal.HOLD