Files
Trading-Bot/src/calibration/strategies_inspector.py
2026-03-09 07:59:49 +01:00

625 lines
23 KiB
Python

# src/calibration/strategies_inspector.py
from __future__ import annotations
from typing import Any, Dict, List
import numpy as np
import pandas as pd
from src.data.storage import StorageManager
from src.utils.logger import log
from src.core.walk_forward import WalkForwardValidator
from src.core.market_regime import TrendScoreConfig, compute_regimes_for_windows
from src.risk.stops.fixed_stop import FixedStop
from src.risk.stops.trailing_stop import TrailingStop
from src.risk.stops.atr_stop import ATRStop
from src.risk.sizing.percent_risk import PercentRiskSizer
# --------------------------------------------------
# Strategy registry (con metadata de parámetros)
# --------------------------------------------------
from src.strategies.registry import STRATEGY_REGISTRY
from src.strategies.ma_crossover import MovingAverageCrossover
from src.strategies.rsi_reversion import RSIStrategy
# --------------------------------------------------
# Helpers
# --------------------------------------------------
def list_available_strategies() -> List[Dict[str, Any]]:
"""
Devuelve metadata completa para UI.
Usa parameters_schema() como fuente de verdad.
"""
out: List[Dict[str, Any]] = []
for strategy_id, strategy_class in STRATEGY_REGISTRY.items():
if not hasattr(strategy_class, "parameters_schema"):
continue
schema = strategy_class.parameters_schema()
out.append({
"strategy_id": strategy_id,
"name": strategy_class.__name__,
"params": list(schema.keys()),
"parameters_schema": schema, # 🔥 ahora enviamos schema completo
"tags": [],
})
return out
def _build_stop_loss(stop_schema) -> object | None:
if stop_schema.type == "fixed":
return FixedStop(stop_fraction=float(stop_schema.stop_fraction))
if stop_schema.type == "trailing":
return TrailingStop(trailing_fraction=float(stop_schema.stop_fraction))
if stop_schema.type == "atr":
return ATRStop(
atr_period=int(stop_schema.atr_period),
multiplier=float(stop_schema.atr_multiplier),
)
raise ValueError(f"Unknown stop type: {stop_schema.type}")
def _build_position_sizer(risk_schema) -> PercentRiskSizer:
return PercentRiskSizer(risk_fraction=float(risk_schema.risk_fraction))
def _cap_units_by_max_position_fraction(units: float, capital: float, entry_price: float, max_position_fraction: float) -> float:
max_units = (capital * max_position_fraction) / entry_price
return float(min(units, max_units))
def _accumulate_equity(initial: float, returns_pct: List[float]) -> List[float]:
eq = [float(initial)]
cur = float(initial)
for r in returns_pct:
cur *= (1.0 + float(r) / 100.0)
eq.append(float(cur))
return eq
def _compute_wf_diagnostics(
*,
window_returns_pct: List[float],
window_trades: List[int],
window_equity: List[float],
test_days: int,
rolling_window: int = 3,
hist_bins: int = 10,
) -> Dict[str, Any]:
"""Compute backend-ready diagnostics for Step 3 visualization.
Notes:
- Operates at *WF window* granularity (not daily).
- All series are aligned to windows (len == n_windows) except equity/drawdown
which include initial point (len == n_windows + 1).
"""
returns = np.asarray(window_returns_pct, dtype=float)
n = int(len(returns))
mean_r = float(np.mean(returns)) if n else 0.0
std_r = float(np.std(returns, ddof=0)) if n else 0.0
pos_rate = float(np.mean(returns > 0.0)) if n else 0.0
# Linear trend of window returns over time (per window index)
if n >= 2:
x = np.arange(n, dtype=float)
slope = float(np.polyfit(x, returns, 1)[0])
else:
slope = 0.0
# Rolling Sharpe-like over windows
k = int(max(1, rolling_window))
roll = [None] * n
if n and k > 1:
scale = float(np.sqrt(k))
for i in range(k - 1, n):
seg = returns[i - k + 1 : i + 1]
m = float(np.mean(seg))
s = float(np.std(seg, ddof=0))
roll[i] = float((m / s) * scale) if s > 0 else 0.0
elif n and k == 1:
roll = [float(r) for r in returns.tolist()]
# Histogram of window returns
if n >= 2:
bins = int(max(3, min(hist_bins, n)))
counts, edges = np.histogram(returns, bins=bins)
hist_counts = [int(c) for c in counts.tolist()]
hist_edges = [float(e) for e in edges.tolist()]
elif n == 1:
hist_counts = [1]
hist_edges = [float(returns[0] - 1.0), float(returns[0] + 1.0)]
else:
hist_counts = []
hist_edges = []
# Drawdown series from equity
eq = [float(x) for x in (window_equity or [])]
dd = []
peak = None
for v in eq:
if peak is None or v > peak:
peak = v
dd.append(float((v / peak - 1.0) * 100.0) if peak and peak > 0 else 0.0)
# Trades density
td = int(test_days) if int(test_days) > 0 else 1
t_int = [int(t) for t in (window_trades or [])]
trades_per_day = [float(t) / float(td) for t in t_int]
return {
"stability": {
"n_windows": n,
"mean_return_pct": mean_r,
"std_return_pct": std_r,
"positive_window_rate": pos_rate,
"return_slope_per_window": slope,
},
"rolling": {
"rolling_window": k,
"rolling_sharpe_like": roll,
},
"distribution": {
"hist_bin_edges": hist_edges,
"hist_counts": hist_counts,
},
"drawdown": {
"equity": eq,
"drawdown_pct": dd,
},
"trades": {
"trades_per_window": t_int,
"trades_per_day": trades_per_day,
},
}
def _compute_regime_performance(
*,
window_returns_pct: List[float],
window_trades: List[int],
regime_windows: List[Dict[str, Any]],
) -> Dict[str, Any]:
def _stats(indices: List[int]) -> Dict[str, Any]:
rets = [float(window_returns_pct[i]) for i in indices if i < len(window_returns_pct)]
trds = [int(window_trades[i]) for i in indices if i < len(window_trades)]
return {
"n_windows": int(len(indices)),
"mean_return_pct": float(np.mean(rets)) if rets else 0.0,
"positive_window_rate": float(np.mean(np.asarray(rets) > 0.0)) if rets else 0.0,
"avg_trades": float(np.mean(trds)) if trds else 0.0,
}
out_group: Dict[str, Any] = {}
for regime in ("bull", "sideways", "bear"):
idx = [i for i, rw in enumerate(regime_windows) if rw.get("regime") == regime]
out_group[regime] = _stats(idx)
out_detail: Dict[str, Any] = {}
for regime in ("bull_strong", "bull_moderate", "sideways", "bear_moderate", "bear_strong"):
idx = [i for i, rw in enumerate(regime_windows) if rw.get("regime_detail") == regime]
out_detail[regime] = _stats(idx)
return {
"group": out_group,
"detail": out_detail,
}
# --------------------------------------------------
# Main
# --------------------------------------------------
def inspect_strategies_config(
*,
storage: StorageManager,
payload,
data_quality: Dict[str, Any],
include_series: bool,
progress_callback=None,
df: pd.DataFrame | None = None,
) -> Dict[str, Any]:
if df is None:
df = storage.load_ohlcv(symbol=payload.symbol, timeframe=payload.timeframe)
if df is None or df.empty:
return {
"valid": False,
"status": "fail",
"checks": {},
"message": "No OHLCV data",
"results": [],
}
checks: Dict[str, Any] = {}
checks["data_quality"] = {
"status": data_quality.get("status", "unknown"),
"message": data_quality.get("message", ""),
}
if data_quality.get("status") == "fail":
out = {
"valid": False,
"status": "fail",
"checks": checks,
"message": "Step 1 data quality is FAIL. Strategies cannot be validated.",
"results": [],
"config": {
"wf": payload.wf.model_dump(),
"risk": payload.risk.model_dump(),
"stop": payload.stop.model_dump(),
"global_rules": payload.global_rules.model_dump(),
"commission": payload.commission,
"slippage": payload.slippage,
},
}
if include_series:
out["series"] = {"strategies": {}}
return out
stop_loss = _build_stop_loss(payload.stop)
base_sizer = _build_position_sizer(payload.risk)
train_td = pd.Timedelta(days=int(payload.wf.train_days))
test_td = pd.Timedelta(days=int(payload.wf.test_days))
step_td = pd.Timedelta(days=int(payload.wf.step_days or payload.wf.test_days))
# Regime analysis is market-level (shared by all strategies for the same WF config)
regime_cfg = TrendScoreConfig()
if payload.strategies:
probe_sid = payload.strategies[0].strategy_id
probe_class = STRATEGY_REGISTRY.get(probe_sid, MovingAverageCrossover)
probe_params = dict(payload.strategies[0].parameters or {})
else:
probe_class = MovingAverageCrossover
probe_params = {}
wf_probe = WalkForwardValidator(
strategy_class=probe_class,
param_grid=None,
fixed_params=probe_params,
data=df,
train_window=train_td,
test_window=test_td,
step_size=step_td,
initial_capital=float(payload.account_equity),
commission=float(payload.commission),
slippage=float(payload.slippage),
position_sizer=base_sizer,
stop_loss=stop_loss,
verbose=False,
)
wf_windows = wf_probe._generate_windows()
regime_bundle = compute_regimes_for_windows(df, wf_windows, config=regime_cfg)
regime_by_window = {int(r["window"]): r for r in regime_bundle["by_window"]}
overall_status = "ok"
results: List[Dict[str, Any]] = []
series: Dict[str, Any] = {"strategies": {}} if include_series else {}
log.info(f"🔥 Strategies received: {len(payload.strategies)}")
for sel in payload.strategies:
sid = sel.strategy_id
entry = STRATEGY_REGISTRY.get(sid)
log.info(f"🧠 Step3 | Processing strategy: {sid}")
if entry is None:
results.append({
"strategy_id": sid,
"status": "fail",
"message": f"Unknown strategy_id: {sid}",
"warnings": [],
"series_available": False,
"series_error": "Unknown strategy_id (not in registry)",
"n_windows": 0,
"oos_final_equity": payload.account_equity,
"oos_total_return_pct": 0.0,
"oos_max_dd_worst_pct": 0.0,
"degradation_sharpe": None,
"windows": [],
})
overall_status = "fail"
log.error(f"❌ Strategy not found in registry: {sid}")
continue
strategy_class = STRATEGY_REGISTRY[sid]
schema = strategy_class.parameters_schema()
valid_params = set(schema.keys())
range_params = set(sel.parameters.keys())
# 🔒 Validación estricta de parámetros
if range_params != valid_params:
msg = f"Parameter keys {range_params} do not match expected {valid_params}"
results.append({
"strategy_id": sid,
"status": "fail",
"message": msg,
"warnings": [],
"series_available": False,
"series_error": msg,
"n_windows": 0,
"oos_final_equity": payload.account_equity,
"oos_total_return_pct": 0.0,
"oos_max_dd_worst_pct": 0.0,
"degradation_sharpe": None,
"windows": [],
})
overall_status = "fail"
continue
# --------------------------------------------------
# Build fixed_params (VALIDATION MODE)
# --------------------------------------------------
fixed_params = {}
for pname, pvalue in sel.parameters.items():
fixed_params[pname] = pvalue
# Wrapper sizer
class _CappedSizer:
def __init__(self, inner, max_position_fraction: float):
self.inner = inner
self.max_position_fraction = max_position_fraction
def calculate_size(self, *, capital, entry_price, stop_price=None, max_capital=None, volatility=None):
u = self.inner.calculate_size(
capital=capital,
entry_price=entry_price,
stop_price=stop_price,
max_capital=max_capital,
volatility=volatility,
)
return _cap_units_by_max_position_fraction(
units=float(u),
capital=float(capital),
entry_price=float(entry_price),
max_position_fraction=float(self.max_position_fraction),
)
capped_sizer = _CappedSizer(base_sizer, payload.risk.max_position_fraction)
log.info(f"🧠 Step3 | WF run | strategy={sid}")
try:
series_available = False
series_error = None
wf = WalkForwardValidator(
strategy_class=strategy_class,
param_grid=None,
fixed_params=fixed_params,
data=df,
train_window=train_td,
test_window=test_td,
step_size=step_td,
initial_capital=float(payload.account_equity),
commission=float(payload.commission),
slippage=float(payload.slippage),
position_sizer=capped_sizer,
stop_loss=stop_loss,
progress_callback=progress_callback,
)
wf_res = wf.run()
win_df: pd.DataFrame = wf_res["windows"]
oos_returns = []
oos_dd = []
warnings_list = []
n_windows = 0
if win_df is None or win_df.empty:
status = "warning"
msg = "No closed trades in OOS"
warnings_list.append("Walk-forward produced no closed trades.")
# ✅ Registrar resultado SIEMPRE (no continue silencioso)
results.append({
"strategy_id": sid,
"status": status,
"message": msg,
"warnings": warnings_list,
# ✅ OPCIÓN B: serie disponible si include_series (aunque sea baseline/empty)
"series_available": bool(include_series),
"series_error": None if include_series else "WF produced no closed trades / empty windows",
"n_windows": 0,
"oos_final_equity": float(payload.account_equity),
"oos_total_return_pct": 0.0,
"oos_max_dd_worst_pct": 0.0,
"degradation_sharpe": None,
"diagnostics": _compute_wf_diagnostics(window_returns_pct=[], window_trades=[], window_equity=[float(payload.account_equity)], test_days=int(payload.wf.test_days)),
"windows": [],
})
# ✅ Serie mínima para poder renderizar algo (equity baseline)
if include_series:
series["strategies"][sid] = {
"window_returns_pct": [],
"window_equity": [float(payload.account_equity)],
"window_trades": [],
"diagnostics": _compute_wf_diagnostics(window_returns_pct=[], window_trades=[], window_equity=[float(payload.account_equity)], test_days=int(payload.wf.test_days)),
}
if overall_status == "ok":
overall_status = "warning"
continue
else:
# 🔒 Validación explícita de columnas WF (no fallbacks silenciosos)
required_cols = {"return_pct", "max_dd_pct", "trades", "window", "train_start", "train_end", "test_start", "test_end", "sharpe", "params"}
missing = required_cols - set(win_df.columns)
if missing:
raise ValueError(f"WF windows missing required columns: {sorted(missing)}")
oos_returns = win_df["return_pct"].tolist()
oos_dd = win_df["max_dd_pct"].tolist()
n_windows = len(win_df)
trades = win_df["trades"].astype(int).tolist()
too_few = sum(t < int(payload.wf.min_trades_test) for t in trades)
if too_few > 0:
warnings_list.append(
f"{too_few} test windows have fewer than {payload.wf.min_trades_test} trades"
)
windows_out = []
if warnings_list:
status = "warning"
msg = "Validation completed with warnings"
if overall_status == "ok":
overall_status = "warning"
else:
status = "ok"
msg = "WF OK"
for _, r in win_df.iterrows():
regime_meta = regime_by_window.get(int(r["window"]), {})
windows_out.append({
"window": int(r["window"]),
"train_start": str(r["train_start"]),
"train_end": str(r["train_end"]),
"test_start": str(r["test_start"]),
"test_end": str(r["test_end"]),
"return_pct": float(r["return_pct"]),
"sharpe": float(r["sharpe"]),
"max_dd_pct": float(r["max_dd_pct"]),
"trades": int(r["trades"]),
"params": dict(r["params"]) if isinstance(r["params"], dict) else r["params"],
"regime": regime_meta.get("regime"),
"regime_detail": regime_meta.get("regime_detail"),
"bull_strong_pct": float(regime_meta.get("bull_strong_pct", 0.0)),
"bull_moderate_pct": float(regime_meta.get("bull_moderate_pct", 0.0)),
"sideways_detail_pct": float(regime_meta.get("sideways_detail_pct", 0.0)),
"bear_moderate_pct": float(regime_meta.get("bear_moderate_pct", 0.0)),
"bear_strong_pct": float(regime_meta.get("bear_strong_pct", 0.0)),
})
oos_returns = win_df["return_pct"].astype(float).tolist()
eq_curve = _accumulate_equity(float(payload.account_equity), oos_returns)
oos_final = float(eq_curve[-1]) if eq_curve else float(payload.account_equity)
oos_total_return = (oos_final / float(payload.account_equity) - 1.0) * 100.0
oos_max_dd = float(np.min(win_df["max_dd_pct"])) if (win_df is not None and not win_df.empty) else 0.0
diagnostics = _compute_wf_diagnostics(
window_returns_pct=oos_returns,
window_trades=win_df["trades"].astype(int).tolist(),
window_equity=eq_curve,
test_days=int(payload.wf.test_days),
rolling_window=3,
hist_bins=10,
)
regime_windows = [regime_by_window.get(int(r["window"]), {"window": int(r["window"]), "regime": "sideways", "bull_pct": 0.0, "sideways_pct": 0.0, "bear_pct": 0.0}) for _, r in win_df.iterrows()]
diagnostics["regimes"] = {
"config": regime_bundle["config"],
"by_window": regime_windows,
"performance": _compute_regime_performance(
window_returns_pct=oos_returns,
window_trades=win_df["trades"].astype(int).tolist(),
regime_windows=regime_windows,
),
}
# keep worst-window DD also at top-level for backwards compatibility
diagnostics["drawdown"]["worst_window_dd_pct"] = float(oos_max_dd)
results.append({
"strategy_id": sid,
"status": status,
"message": msg,
"warnings": warnings_list,
"series_available": bool(include_series),
"series_error": None,
"n_windows": int(len(windows_out)),
"oos_final_equity": oos_final,
"oos_total_return_pct": float(oos_total_return),
"oos_max_dd_worst_pct": float(oos_max_dd),
"degradation_sharpe": None,
"diagnostics": diagnostics,
"windows": windows_out,
})
if include_series:
series["strategies"][sid] = {
"window_returns_pct": oos_returns,
"window_equity": eq_curve,
"window_trades": win_df["trades"].tolist(),
"window_regimes": diagnostics.get("regimes", {}).get("by_window", []),
"diagnostics": diagnostics,
}
except Exception as e:
log.error(f"❌ Step3 WF error | strategy={sid} | {e}")
results.append({
"strategy_id": sid,
"status": "fail",
"message": f"Exception: {e}",
"warnings": [],
"series_available": False,
"series_error": f"{type(e).__name__}: {e}",
"n_windows": 0,
"oos_final_equity": float(payload.account_equity),
"oos_total_return_pct": 0.0,
"oos_max_dd_worst_pct": 0.0,
"degradation_sharpe": None,
"windows": [],
})
overall_status = "fail"
valid = overall_status != "fail"
human_msg = {
"ok": "Strategies validation OK",
"warning": "Strategies validation has warnings",
"fail": "Strategies validation FAILED",
}[overall_status]
out = {
"valid": valid,
"status": overall_status,
"checks": checks,
"message": human_msg,
"results": results,
"config": {
"wf": payload.wf.model_dump(),
"risk": payload.risk.model_dump(),
"stop": payload.stop.model_dump(),
"global_rules": payload.global_rules.model_dump(),
"commission": payload.commission,
"slippage": payload.slippage,
},
"regimes": {
"config": regime_bundle["config"],
"by_window": regime_bundle["by_window"],
},
}
if include_series:
out["series"] = series
return out