- Implemented portfolio engine with risk-based allocation (50/50) - Added equity-based metrics for system-level evaluation - Validated portfolio against standalone strategies - Reduced max drawdown and volatility at system level - Quantitative decision closed before paper trading phase
257 lines
7.0 KiB
Python
257 lines
7.0 KiB
Python
# scripts/research/wf_optimize_strategies.py
|
||
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
from dotenv import load_dotenv
|
||
from datetime import timedelta
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
import matplotlib.pyplot as plt
|
||
import seaborn as sns
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||
|
||
from src.core.engine import Engine
|
||
from src.data.storage import StorageManager
|
||
from src.risk.sizing.percent_risk import PercentRiskSizer
|
||
from src.risk.stops.trailing_stop import TrailingStop
|
||
|
||
from src.strategies.optimization.opt_moving_average import MACrossoverOptimization
|
||
from src.strategies.optimization.opt_trend_filtered import TrendFilteredMAOptimization
|
||
|
||
from src.utils.logger import log
|
||
|
||
|
||
# --------------------------------------------------
|
||
# CONFIG
|
||
# --------------------------------------------------
|
||
SYMBOL = "BTC/USDT"
|
||
TIMEFRAME = "1h"
|
||
|
||
TRAIN_DAYS = 120
|
||
TEST_DAYS = 30
|
||
STEP_DAYS = 30
|
||
|
||
INITIAL_CAPITAL = 10_000
|
||
|
||
RISK = PercentRiskSizer(0.01)
|
||
STOP = TrailingStop(0.02)
|
||
|
||
COMMISSION = 0.001
|
||
SLIPPAGE = 0.0005
|
||
|
||
OUTPUT_DIR = Path(__file__).parent / "output/wf_optimize"
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
TOP_N_PRINT = 15
|
||
|
||
OPTIMIZERS = [
|
||
MACrossoverOptimization,
|
||
TrendFilteredMAOptimization,
|
||
]
|
||
|
||
# --------------------------------------------------
|
||
def setup_env():
|
||
env_path = Path(__file__).parent.parent.parent / "config" / "secrets.env"
|
||
load_dotenv(env_path)
|
||
|
||
def load_data():
|
||
setup_env()
|
||
|
||
storage = StorageManager(
|
||
db_host=os.getenv("DB_HOST"),
|
||
db_port=int(os.getenv("DB_PORT", 5432)),
|
||
db_name=os.getenv("DB_NAME"),
|
||
db_user=os.getenv("DB_USER"),
|
||
db_password=os.getenv("DB_PASSWORD"),
|
||
)
|
||
|
||
data = storage.load_ohlcv(
|
||
symbol=SYMBOL,
|
||
timeframe=TIMEFRAME,
|
||
use_cache=True,
|
||
)
|
||
storage.close()
|
||
|
||
if data.empty:
|
||
raise RuntimeError("No data loaded")
|
||
|
||
return data
|
||
|
||
def run_wf(data: pd.DataFrame, strategy_factory) -> dict:
|
||
capital = INITIAL_CAPITAL
|
||
window_returns = []
|
||
worst_dd = 0.0
|
||
windows = 0
|
||
|
||
start = data.index[0]
|
||
|
||
while True:
|
||
train_end = start + timedelta(days=TRAIN_DAYS)
|
||
test_end = train_end + timedelta(days=TEST_DAYS)
|
||
|
||
if test_end > data.index[-1]:
|
||
break
|
||
|
||
test_data = data.loc[train_end:test_end]
|
||
|
||
engine = Engine(
|
||
strategy=strategy_factory(),
|
||
initial_capital=capital,
|
||
position_sizer=RISK,
|
||
stop_loss=STOP,
|
||
commission=COMMISSION,
|
||
slippage=SLIPPAGE,
|
||
)
|
||
|
||
res = engine.run(test_data)
|
||
|
||
windows += 1
|
||
window_returns.append(res["total_return_pct"])
|
||
worst_dd = min(worst_dd, res["max_drawdown_pct"])
|
||
capital *= (1 + res["total_return_pct"] / 100)
|
||
|
||
start += timedelta(days=STEP_DAYS)
|
||
|
||
wr = np.array(window_returns, dtype=float)
|
||
|
||
return {
|
||
"windows": windows,
|
||
"final_capital": round(capital, 2),
|
||
"return_mean": round(wr.mean(), 4),
|
||
"return_median": round(np.median(wr), 4),
|
||
"max_dd_worst": round(worst_dd, 4),
|
||
"win_rate": round((wr > 0).mean(), 4),
|
||
}
|
||
|
||
def score_row(row):
|
||
dd_penalty = abs(row["max_dd_worst"]) / 100.0
|
||
return (
|
||
(row["final_capital"] / INITIAL_CAPITAL)
|
||
+ row["win_rate"] * 0.5
|
||
- dd_penalty * 1.5
|
||
)
|
||
|
||
# --------------------------------------------------
|
||
def run():
|
||
data = load_data()
|
||
rows = []
|
||
|
||
for optimizer in OPTIMIZERS:
|
||
log.info(f"🧪 Optimizing {optimizer.name}")
|
||
|
||
for params in optimizer.parameter_grid():
|
||
|
||
def factory(p=params):
|
||
return optimizer.build_strategy(p)
|
||
|
||
metrics = run_wf(data, factory)
|
||
row = {
|
||
"strategy": optimizer.name,
|
||
**params,
|
||
**metrics,
|
||
}
|
||
row["score"] = score_row(row)
|
||
rows.append(row)
|
||
|
||
df = pd.DataFrame(rows)
|
||
df = df.sort_values("score", ascending=False)
|
||
|
||
# --------------------------------------------------
|
||
# SAVE CSV
|
||
# --------------------------------------------------
|
||
csv_path = OUTPUT_DIR / "wf_optimization_results.csv"
|
||
df.to_csv(csv_path, index=False)
|
||
log.info(f"✅ Saved CSV: {csv_path}")
|
||
|
||
# --------------------------------------------------
|
||
# PRINT TOP CONFIGS
|
||
# --------------------------------------------------
|
||
print("\n" + "=" * 100)
|
||
print("🏆 TOP CONFIGS (by score)")
|
||
print("=" * 100)
|
||
|
||
for strat in df["strategy"].unique():
|
||
top = df[df["strategy"] == strat].head(TOP_N_PRINT)
|
||
print(f"\n--- {strat} ---")
|
||
print(
|
||
top[
|
||
[
|
||
"fast_period",
|
||
"slow_period",
|
||
"adx_threshold",
|
||
"final_capital",
|
||
"return_mean",
|
||
"win_rate",
|
||
"max_dd_worst",
|
||
"score",
|
||
]
|
||
].to_string(index=False)
|
||
)
|
||
|
||
# --------------------------------------------------
|
||
# 📊 VISUALIZATIONS
|
||
# --------------------------------------------------
|
||
sns.set(style="whitegrid")
|
||
|
||
# A) Scatter: Score vs DD
|
||
plt.figure(figsize=(10, 6))
|
||
sns.scatterplot(
|
||
data=df,
|
||
x="max_dd_worst",
|
||
y="score",
|
||
hue="strategy",
|
||
size="final_capital",
|
||
sizes=(40, 200),
|
||
alpha=0.7,
|
||
)
|
||
plt.title("Score vs Max Drawdown")
|
||
plt.tight_layout()
|
||
plt.savefig(OUTPUT_DIR / "scatter_score_vs_dd.png")
|
||
plt.close()
|
||
|
||
# B) Boxplot: Score by Strategy
|
||
plt.figure(figsize=(8, 5))
|
||
sns.boxplot(data=df, x="strategy", y="score")
|
||
plt.title("Score Distribution by Strategy")
|
||
plt.tight_layout()
|
||
plt.savefig(OUTPUT_DIR / "boxplot_score_by_strategy.png")
|
||
plt.close()
|
||
|
||
# C) Heatmaps
|
||
for strat in df["strategy"].unique():
|
||
sub = df[df["strategy"] == strat]
|
||
|
||
if "adx_threshold" in sub.columns and sub["adx_threshold"].notna().any():
|
||
for adx in sorted(sub["adx_threshold"].dropna().unique()):
|
||
pivot = sub[sub["adx_threshold"] == adx].pivot(
|
||
index="fast_period",
|
||
columns="slow_period",
|
||
values="final_capital",
|
||
)
|
||
plt.figure(figsize=(10, 6))
|
||
sns.heatmap(pivot, annot=False, cmap="viridis")
|
||
plt.title(f"{strat} – Final Capital (ADX={adx})")
|
||
plt.tight_layout()
|
||
plt.savefig(
|
||
OUTPUT_DIR / f"heatmap_{strat}_adx_{int(adx)}.png"
|
||
)
|
||
plt.close()
|
||
else:
|
||
pivot = sub.pivot(
|
||
index="fast_period",
|
||
columns="slow_period",
|
||
values="final_capital",
|
||
)
|
||
plt.figure(figsize=(10, 6))
|
||
sns.heatmap(pivot, annot=False, cmap="viridis")
|
||
plt.title(f"{strat} – Final Capital")
|
||
plt.tight_layout()
|
||
plt.savefig(OUTPUT_DIR / f"heatmap_{strat}.png")
|
||
plt.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run() |