# scripts/research/wf_optimize_strategies.py import os import sys from pathlib import Path from dotenv import load_dotenv from datetime import timedelta import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from src.core.engine import Engine from src.data.storage import StorageManager from src.risk.sizing.percent_risk import PercentRiskSizer from src.risk.stops.trailing_stop import TrailingStop from src.strategies.optimization.opt_moving_average import MACrossoverOptimization from src.strategies.optimization.opt_trend_filtered import TrendFilteredMAOptimization from src.utils.logger import log # -------------------------------------------------- # CONFIG # -------------------------------------------------- SYMBOL = "BTC/USDT" TIMEFRAME = "1h" TRAIN_DAYS = 120 TEST_DAYS = 30 STEP_DAYS = 30 INITIAL_CAPITAL = 10_000 RISK = PercentRiskSizer(0.01) STOP = TrailingStop(0.02) COMMISSION = 0.001 SLIPPAGE = 0.0005 OUTPUT_DIR = Path(__file__).parent / "output/wf_optimize" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) TOP_N_PRINT = 15 OPTIMIZERS = [ MACrossoverOptimization, TrendFilteredMAOptimization, ] # -------------------------------------------------- def setup_env(): env_path = Path(__file__).parent.parent.parent / "config" / "secrets.env" load_dotenv(env_path) def load_data(): setup_env() storage = StorageManager( db_host=os.getenv("DB_HOST"), db_port=int(os.getenv("DB_PORT", 5432)), db_name=os.getenv("DB_NAME"), db_user=os.getenv("DB_USER"), db_password=os.getenv("DB_PASSWORD"), ) data = storage.load_ohlcv( symbol=SYMBOL, timeframe=TIMEFRAME, use_cache=True, ) storage.close() if data.empty: raise RuntimeError("No data loaded") return data def run_wf(data: pd.DataFrame, strategy_factory) -> dict: capital = INITIAL_CAPITAL window_returns = [] worst_dd = 0.0 windows = 0 start = data.index[0] while True: train_end = start + timedelta(days=TRAIN_DAYS) test_end = train_end + timedelta(days=TEST_DAYS) if test_end > data.index[-1]: break test_data = data.loc[train_end:test_end] engine = Engine( strategy=strategy_factory(), initial_capital=capital, position_sizer=RISK, stop_loss=STOP, commission=COMMISSION, slippage=SLIPPAGE, ) res = engine.run(test_data) windows += 1 window_returns.append(res["total_return_pct"]) worst_dd = min(worst_dd, res["max_drawdown_pct"]) capital *= (1 + res["total_return_pct"] / 100) start += timedelta(days=STEP_DAYS) wr = np.array(window_returns, dtype=float) return { "windows": windows, "final_capital": round(capital, 2), "return_mean": round(wr.mean(), 4), "return_median": round(np.median(wr), 4), "max_dd_worst": round(worst_dd, 4), "win_rate": round((wr > 0).mean(), 4), } def score_row(row): dd_penalty = abs(row["max_dd_worst"]) / 100.0 return ( (row["final_capital"] / INITIAL_CAPITAL) + row["win_rate"] * 0.5 - dd_penalty * 1.5 ) # -------------------------------------------------- def run(): data = load_data() rows = [] for optimizer in OPTIMIZERS: log.info(f"๐Ÿงช Optimizing {optimizer.name}") for params in optimizer.parameter_grid(): def factory(p=params): return optimizer.build_strategy(p) metrics = run_wf(data, factory) row = { "strategy": optimizer.name, **params, **metrics, } row["score"] = score_row(row) rows.append(row) df = pd.DataFrame(rows) df = df.sort_values("score", ascending=False) # -------------------------------------------------- # SAVE CSV # -------------------------------------------------- csv_path = OUTPUT_DIR / "wf_optimization_results.csv" df.to_csv(csv_path, index=False) log.info(f"โœ… Saved CSV: {csv_path}") # -------------------------------------------------- # PRINT TOP CONFIGS # -------------------------------------------------- print("\n" + "=" * 100) print("๐Ÿ† TOP CONFIGS (by score)") print("=" * 100) for strat in df["strategy"].unique(): top = df[df["strategy"] == strat].head(TOP_N_PRINT) print(f"\n--- {strat} ---") print( top[ [ "fast_period", "slow_period", "adx_threshold", "final_capital", "return_mean", "win_rate", "max_dd_worst", "score", ] ].to_string(index=False) ) # -------------------------------------------------- # ๐Ÿ“Š VISUALIZATIONS # -------------------------------------------------- sns.set(style="whitegrid") # A) Scatter: Score vs DD plt.figure(figsize=(10, 6)) sns.scatterplot( data=df, x="max_dd_worst", y="score", hue="strategy", size="final_capital", sizes=(40, 200), alpha=0.7, ) plt.title("Score vs Max Drawdown") plt.tight_layout() plt.savefig(OUTPUT_DIR / "scatter_score_vs_dd.png") plt.close() # B) Boxplot: Score by Strategy plt.figure(figsize=(8, 5)) sns.boxplot(data=df, x="strategy", y="score") plt.title("Score Distribution by Strategy") plt.tight_layout() plt.savefig(OUTPUT_DIR / "boxplot_score_by_strategy.png") plt.close() # C) Heatmaps for strat in df["strategy"].unique(): sub = df[df["strategy"] == strat] if "adx_threshold" in sub.columns and sub["adx_threshold"].notna().any(): for adx in sorted(sub["adx_threshold"].dropna().unique()): pivot = sub[sub["adx_threshold"] == adx].pivot( index="fast_period", columns="slow_period", values="final_capital", ) plt.figure(figsize=(10, 6)) sns.heatmap(pivot, annot=False, cmap="viridis") plt.title(f"{strat} โ€“ Final Capital (ADX={adx})") plt.tight_layout() plt.savefig( OUTPUT_DIR / f"heatmap_{strat}_adx_{int(adx)}.png" ) plt.close() else: pivot = sub.pivot( index="fast_period", columns="slow_period", values="final_capital", ) plt.figure(figsize=(10, 6)) sns.heatmap(pivot, annot=False, cmap="viridis") plt.title(f"{strat} โ€“ Final Capital") plt.tight_layout() plt.savefig(OUTPUT_DIR / f"heatmap_{strat}.png") plt.close() if __name__ == "__main__": run()