-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexperiment.py
More file actions
187 lines (157 loc) · 6.84 KB
/
Copy pathexperiment.py
File metadata and controls
187 lines (157 loc) · 6.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Körningsmotor: kör pipelinen, logga allt, skriv en immutabel audit-mapp.
Kör: uv run python -m fibengine.experiment
"""
from __future__ import annotations
import argparse
import json
import random
from datetime import UTC, datetime
from pathlib import Path
import numpy as np
from fibengine.core.config import REPO_ROOT, Settings, load_settings
from fibengine.core.logging_conf import setup_logging
from fibengine.core.models import Swing
from fibengine.core.scoring import select_swing
from fibengine.data.loader import atr, load_candles
from fibengine.evaluation.metrics import evaluate
from fibengine.labeling.store import SwingLabel, list_labels
from fibengine.sizing.solros import build_sizing_plan, simulate_plan
from fibengine.viz.plot import plot_prediction
RUNS_DIR = REPO_ROOT / "experiments" / "runs"
LEADERBOARD = REPO_ROOT / "experiments" / "results" / "leaderboard.jsonl"
def _new_run_id() -> str:
return datetime.now(UTC).strftime("run_%Y%m%dT%H%M%SZ")
def _run_dir(run_id: str) -> Path:
stamp = run_id.split("_", 1)[1]
run_date = f"{stamp[0:4]}-{stamp[4:6]}-{stamp[6:8]}"
return RUNS_DIR / "experiment" / run_date / run_id
def _maybe_emit_sizing(settings: Settings, df, swing: Swing, run_dir: Path, name: str, log):
"""Lager B: skriv en sizing-plan endast om aktiverad. Påverkar inte urvalet."""
if not settings.sizing.enabled:
return
plan = simulate_plan(df, swing, build_sizing_plan(swing, settings.sizing))
(run_dir / f"sizing_plan_{name}.json").write_text(
json.dumps([e.to_dict() for e in plan], indent=2)
)
log.info("Sizing-plan ({}) skriven: {} entries", name, len(plan))
def _run_one(settings: Settings, label: SwingLabel, run_dir: Path, log) -> dict | None:
data_cfg = settings.data.model_copy(
update={
"exchange": label.exchange,
"symbol": label.symbol,
"timeframe": label.timeframe,
}
)
df = load_candles(data_cfg)
swing = select_swing(df, settings.pivots, settings.scoring)
if swing is None:
log.warning("Ingen swing detekterad för {}", label.symbol)
return None
atr_series = atr(df, settings.pivots.atr_period)
atr_value = float(atr_series.iloc[swing.end.index])
if not np.isfinite(atr_value) or atr_value <= 0:
atr_value = float(np.nanmedian(atr_series.to_numpy()))
metrics = evaluate(df, swing, label, atr_value, settings.evaluation)
label_id = f"{label.exchange}_{label.symbol.replace('/', '-')}_{label.timeframe}"
plot_path = run_dir / f"{label_id}.png"
plot_prediction(df, swing, settings.fib.levels, plot_path, label=label, title=label_id)
_maybe_emit_sizing(settings, df, swing, run_dir, label_id, log)
if metrics["out_of_window"]:
log.warning(
"{} | label utanför candle-fönstret — exkluderas ur aggregat "
"(ladda mer historik för denna timeframe)",
label_id,
)
log.info(
"{} | status={} agreement={} fib_err={} out_of_window={}",
label_id,
swing.status,
metrics["agreement"],
metrics["mean_fib_err_frac"],
metrics["out_of_window"],
)
return {"label": label_id, "metrics": metrics, "predicted_swing": swing.to_dict()}
def _aggregate(results: list[dict]) -> dict:
if not results:
return {}
all_m = [r["metrics"] for r in results]
# Out-of-window-labels jämförs mot fel bar → exkludera ur aggregaten,
# men rapportera hur många som hoppades över för transparens.
m = [x for x in all_m if not x.get("out_of_window")]
excluded = len(all_m) - len(m)
if not m:
return {"n": 0, "excluded_out_of_window": excluded, "no_in_window_samples": True}
return {
"n": len(m),
"excluded_out_of_window": excluded,
"no_in_window_samples": False,
"mean_agreement": round(float(np.mean([x["agreement"] for x in m])), 4),
"mean_fib_err_frac": round(float(np.mean([x["mean_fib_err_frac"] for x in m])), 4),
"mean_high_price_err_atr": round(float(np.mean([x["high_price_err_atr"] for x in m])), 4),
"mean_low_price_err_atr": round(float(np.mean([x["low_price_err_atr"] for x in m])), 4),
}
def run_experiment(settings: Settings | None = None) -> Path:
settings = settings or load_settings()
random.seed(settings.seed)
np.random.seed(settings.seed)
run_id = _new_run_id()
config_hash = settings.config_hash()
run_dir = _run_dir(run_id)
run_dir.mkdir(parents=True, exist_ok=True)
log = setup_logging(run_id, config_hash, log_file=run_dir / "run.log")
# Immutabel config-snapshot för auditen.
(run_dir / "config.json").write_text(json.dumps(settings.model_dump(), indent=2))
log.info("Startar experiment {} (cfg {})", run_id, config_hash)
# Agreement mäts bara mot mänskligt facit; maskin-labels exkluderas (kandidater,
# inte domare) så vi inte råkar mäta motorn mot sin egen output.
labels = list_labels(source="human")
n_machine = len(list_labels(source="machine"))
if n_machine:
log.info("Hoppar över {} maskin-labels i agreement (ej ground truth)", n_machine)
results: list[dict] = []
if not labels:
log.warning("Inga labels i data/labels/ — kör enbart prediktion på konfig-symbolen.")
df = load_candles(settings.data)
swing = select_swing(df, settings.pivots, settings.scoring)
if swing is not None:
plot_prediction(
df,
swing,
settings.fib.levels,
run_dir / "prediction.png",
title=f"{settings.data.symbol} {settings.data.timeframe}",
)
_maybe_emit_sizing(settings, df, swing, run_dir, "demo", log)
log.info("Predikterad leg: {}", swing.to_dict())
else:
for label in labels:
r = _run_one(settings, label, run_dir, log)
if r is not None:
results.append(r)
aggregate = _aggregate(results)
(run_dir / "metrics.json").write_text(
json.dumps({"aggregate": aggregate, "results": results}, indent=2)
)
leaderboard_row = {
"run_id": run_id,
"config_hash": config_hash,
"timestamp": datetime.now(UTC).isoformat(),
"weights": settings.scoring.weights,
**aggregate,
}
with LEADERBOARD.open("a") as f:
f.write(json.dumps(leaderboard_row) + "\n")
log.info("Klart. Audit-mapp: {} | aggregate: {}", run_dir, aggregate)
return run_dir
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Run fibengine experiment pipeline.")
p.add_argument(
"--config",
type=str,
default="",
help="Optional settings file path (default: config/settings.yaml).",
)
return p.parse_args()
if __name__ == "__main__":
args = _parse_args()
run_experiment(load_settings(args.config or None))