# src/calibration/strategies_inspector.py from __future__ import annotations from typing import Any, Dict, List import numpy as np import pandas as pd from src.data.storage import StorageManager from src.utils.logger import log from src.core.walk_forward import WalkForwardValidator from src.risk.stops.fixed_stop import FixedStop from src.risk.stops.trailing_stop import TrailingStop from src.risk.stops.atr_stop import ATRStop from src.risk.sizing.percent_risk import PercentRiskSizer # -------------------------------------------------- # Strategy registry (con metadata de parámetros) # -------------------------------------------------- from src.strategies.moving_average import MovingAverageCrossover from src.strategies.rsi_strategy import RSIStrategy from src.strategies.buy_and_hold import BuyAndHold STRATEGY_REGISTRY = { "moving_average": { "class": MovingAverageCrossover, "params": ["fast_period", "slow_period"], }, "rsi": { "class": RSIStrategy, "params": ["rsi_period", "overbought", "oversold"], }, "buy_and_hold": { "class": BuyAndHold, "params": [], }, } # -------------------------------------------------- # Helpers # -------------------------------------------------- def list_available_strategies() -> List[Dict[str, Any]]: """ Devuelve metadata completa para UI. """ out = [] for sid, entry in STRATEGY_REGISTRY.items(): out.append({ "strategy_id": sid, "name": entry["class"].__name__, "params": entry["params"], "tags": [], # puedes rellenar más adelante }) return out def _build_stop_loss(stop_schema) -> object | None: if stop_schema.type == "fixed": return FixedStop(stop_fraction=float(stop_schema.stop_fraction)) if stop_schema.type == "trailing": return TrailingStop(stop_fraction=float(stop_schema.stop_fraction)) if stop_schema.type == "atr": return ATRStop( atr_period=int(stop_schema.atr_period), multiplier=float(stop_schema.atr_multiplier), ) raise ValueError(f"Unknown stop type: {stop_schema.type}") def _build_position_sizer(risk_schema) -> PercentRiskSizer: return PercentRiskSizer(risk_fraction=float(risk_schema.risk_fraction)) def _cap_units_by_max_position_fraction(units: float, capital: float, entry_price: float, max_position_fraction: float) -> float: max_units = (capital * max_position_fraction) / entry_price return float(min(units, max_units)) def _accumulate_equity(initial: float, returns_pct: List[float]) -> List[float]: eq = [float(initial)] cur = float(initial) for r in returns_pct: cur *= (1.0 + float(r) / 100.0) eq.append(float(cur)) return eq # -------------------------------------------------- # Main # -------------------------------------------------- def inspect_strategies_config( *, storage: StorageManager, payload, data_quality: Dict[str, Any], include_series: bool, progress_callback=None, ) -> Dict[str, Any]: df = storage.load_ohlcv(symbol=payload.symbol, timeframe=payload.timeframe) if df is None or df.empty: return { "valid": False, "status": "fail", "checks": {}, "message": "No OHLCV data", "results": [], } checks: Dict[str, Any] = {} checks["data_quality"] = { "status": data_quality.get("status", "unknown"), "message": data_quality.get("message", ""), } if data_quality.get("status") == "fail": return { "valid": False, "status": "fail", "checks": checks, "message": "Step 1 data quality is FAIL. Strategies cannot be validated.", "results": [], "series": {} if include_series else None, } stop_loss = _build_stop_loss(payload.stop) base_sizer = _build_position_sizer(payload.risk) train_td = pd.Timedelta(days=int(payload.wf.train_days)) test_td = pd.Timedelta(days=int(payload.wf.test_days)) step_td = pd.Timedelta(days=int(payload.wf.step_days or payload.wf.test_days)) overall_status = "ok" results: List[Dict[str, Any]] = [] series: Dict[str, Any] = {"strategies": {}} if include_series else {} for sel in payload.strategies: sid = sel.strategy_id entry = STRATEGY_REGISTRY.get(sid) if entry is None: results.append({ "strategy_id": sid, "status": "fail", "message": f"Unknown strategy_id: {sid}", "n_windows": 0, "oos_final_equity": payload.account_equity, "oos_total_return_pct": 0.0, "oos_max_dd_worst_pct": 0.0, "degradation_sharpe": None, "windows": [], }) overall_status = "fail" continue strategy_class = entry["class"] valid_params = set(entry["params"]) grid_params = set(sel.param_grid.keys()) # 🔒 Validación estricta de parámetros if grid_params != valid_params: msg = f"Param grid keys {grid_params} do not match expected {valid_params}" results.append({ "strategy_id": sid, "status": "fail", "message": msg, "n_windows": 0, "oos_final_equity": payload.account_equity, "oos_total_return_pct": 0.0, "oos_max_dd_worst_pct": 0.0, "degradation_sharpe": None, "windows": [], }) overall_status = "fail" continue # Wrapper sizer class _CappedSizer(type(base_sizer)): def __init__(self, inner): self.inner = inner def calculate_size(self, *, capital, entry_price, stop_price=None, max_capital=None, volatility=None): u = self.inner.calculate_size( capital=capital, entry_price=entry_price, stop_price=stop_price, max_capital=max_capital, volatility=volatility, ) return _cap_units_by_max_position_fraction( units=float(u), capital=float(capital), entry_price=float(entry_price), max_position_fraction=float(payload.risk.max_position_fraction), ) capped_sizer = _CappedSizer(base_sizer) log.info(f"🧠 Step3 | WF run | strategy={sid}") try: wf = WalkForwardValidator( strategy_class=strategy_class, param_grid=sel.param_grid, data=df, train_window=train_td, test_window=test_td, step_size=step_td, initial_capital=float(payload.account_equity), commission=float(payload.commission), slippage=float(payload.slippage), optimizer_metric=str(payload.optimization.optimizer_metric), position_sizer=capped_sizer, stop_loss=stop_loss, max_combinations=int(payload.optimization.max_combinations), progress_callback=progress_callback, ) wf_res = wf.run() win_df: pd.DataFrame = wf_res["windows"] if win_df is None or win_df.empty: status = "fail" msg = "WF produced no valid windows" overall_status = "fail" windows_out = [] oos_returns = [] else: trades = win_df["trades"].astype(int).tolist() too_few = sum(t < int(payload.optimization.min_trades_test) for t in trades) if too_few > 0: status = "warning" msg = f"{too_few} windows below min_trades_test" if overall_status == "ok": overall_status = "warning" else: status = "ok" msg = "WF OK" windows_out = [] for _, r in win_df.iterrows(): windows_out.append({ "window": int(r["window"]), "train_start": str(r["train_start"]), "train_end": str(r["train_end"]), "test_start": str(r["test_start"]), "test_end": str(r["test_end"]), "return_pct": float(r["return_pct"]), "sharpe": float(r["sharpe"]), "max_dd_pct": float(r["max_dd_pct"]), "trades": int(r["trades"]), "params": dict(r["params"]) if isinstance(r["params"], dict) else r["params"], }) oos_returns = win_df["return_pct"].astype(float).tolist() eq_curve = _accumulate_equity(float(payload.account_equity), oos_returns) oos_final = float(eq_curve[-1]) if eq_curve else float(payload.account_equity) oos_total_return = (oos_final / float(payload.account_equity) - 1.0) * 100.0 oos_max_dd = float(np.min(win_df["max_dd_pct"])) if (win_df is not None and not win_df.empty) else 0.0 results.append({ "strategy_id": sid, "status": status, "message": msg, "n_windows": int(len(windows_out)), "oos_final_equity": oos_final, "oos_total_return_pct": float(oos_total_return), "oos_max_dd_worst_pct": float(oos_max_dd), "degradation_sharpe": None, "windows": windows_out, }) if include_series: series["strategies"][sid] = { "window_returns_pct": oos_returns, "window_equity": eq_curve, } except Exception as e: log.error(f"❌ Step3 WF error | strategy={sid} | {e}") results.append({ "strategy_id": sid, "status": "fail", "message": f"Exception: {e}", "n_windows": 0, "oos_final_equity": float(payload.account_equity), "oos_total_return_pct": 0.0, "oos_max_dd_worst_pct": 0.0, "degradation_sharpe": None, "windows": [], }) overall_status = "fail" valid = overall_status != "fail" human_msg = { "ok": "Strategies validation OK", "warning": "Strategies validation has warnings", "fail": "Strategies validation FAILED", }[overall_status] out = { "valid": valid, "status": overall_status, "checks": checks, "message": human_msg, "results": results, } if include_series: out["series"] = series return out