-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrain_ln.py
More file actions
171 lines (145 loc) · 7.01 KB
/
train_ln.py
File metadata and controls
171 lines (145 loc) · 7.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# rlte/train_ln.py
# Trainingsschleife (Algorithmus 1) – vektorisierte Datensammlung über 128 parallele Envs,
# H=400 Iterationen, τ=1,280 Trajektorien à N=10, Adam(η=5e-4), σ-Schedule nach (11).
# Keine Clipping-Heuristik. :contentReference[oaicite:7]{index=7}
from __future__ import annotations
from typing import List, Dict, Tuple
import time
import numpy as np
import torch
import torch.optim as optim
from . import config as C
from .utils import seed_everything, linear_sigma_schedule, bijection_test, SmoothedValue
from .agents.ln_actor_critic import LogisticNormalActorCritic
from .env_lob import LOBExecutionEnv
def _stack_state_dicts(dicts: List[Dict[str, np.ndarray]]) -> np.ndarray:
"""
Flache Feature-Vektoren aus State-Dicts bauen (Reihenfolge fix).
"""
keys = ["pa","pb","vb","va","delta_m","delta_l","dp","t","M","m","levels","queues","gamma"]
vecs = []
for d in dicts:
vec = np.concatenate([d[k].ravel() for k in keys], axis=0).astype(np.float32)
vecs.append(vec)
return np.stack(vecs, axis=0)
def feature_dim(K: int, M0: int) -> int:
# pa(1)+pb(1)+vb(K-1)+va(K-1)+delta_m(1)+delta_l(1)+dp(1)+t(1)+M(1)+m(1)+levels(M0)+queues(M0)+gamma(K)
return 1+1+(K-1)+(K-1)+1+1+1+1+1+1+M0+M0+K
def train_ln(market: str = "noise", M0: int = 20, device: str = "cpu") -> Dict[str, float]:
seed_everything(C.SEED)
# Unittest: Bijektion h/h^{-1}
assert bijection_test(K=C.K_SIMPLEX, trials=50), "Bijektionstest h/h^{-1} fehlgeschlagen!"
# Parallele Envs
envs = [LOBExecutionEnv(market=market, M0=M0, seed=C.SEED + i) for i in range(C.PAR_ENVS)]
s0_list = [env.reset() for env in envs]
obs = _stack_state_dicts(s0_list) # [P, obs_dim]
obs_dim = feature_dim(C.K_SIMPLEX, M0)
# Actor-Critic
ac = LogisticNormalActorCritic(input_dim=obs_dim, hidden=C.HIDDEN, K=C.K_SIMPLEX, device=device)
optimizer = optim.Adam(ac.parameters(), lr=C.ADAM_LR)
# Speicher für Trajektorien (τ=1280: 128 Envs × 10 Schritte/Traj × 100 Steps/Iter)
# Wir sammeln pro Iteration 100 Schritte je Env; die Trajektorienlänge im Paper ist N=10,
# in der Praxis wird der MC-Return über N=10 gerollt (hier: wir berechnen G_n über 10 Schritte).
steps_per_iter = C.STEPS_PER_ENV
returns_smooth = SmoothedValue()
for it in range(1, C.H_ITERS + 1):
sigma_i = linear_sigma_schedule(it, C.H_ITERS, C.SIGMA_INIT, C.SIGMA_FINAL)
ac.set_sigma(sigma_i)
batch_obs = []
batch_logphi = []
batch_rewards = []
batch_values = []
batch_actions_x = [] # x ~ N(mu, σ^2 I), wird für Logdichte genutzt
batch_dones = []
batch_infos = []
for step in range(steps_per_iter):
ob_t = torch.from_numpy(obs).to(device=device, dtype=torch.float32)
with torch.no_grad():
pol_out = ac.forward_policy(ob_t)
v_pred = ac.forward_value(ob_t).squeeze(-1)
# Simplex-Aktionen: pol_out.a ∈ [P, K+1] – numpy für Env
a_np = pol_out.a.cpu().numpy()
# Schritt in allen Envs
next_states = []
rewards = []
dones = []
infos = []
x_np = pol_out.x.detach().cpu().numpy()
for i, env in enumerate(envs):
s_next, r, done, info = env.step(a_np[i])
next_states.append(s_next)
rewards.append(r)
dones.append(done)
infos.append(info)
# speichere Batch
batch_obs.append(obs.copy())
batch_logphi.append(pol_out.log_phi.detach().cpu().numpy())
batch_rewards.append(np.array(rewards, dtype=np.float32))
batch_values.append(v_pred.detach().cpu().numpy())
batch_actions_x.append(x_np)
batch_dones.append(np.array(dones, dtype=np.bool_))
batch_infos.append(infos)
# obs updaten
obs = _stack_state_dicts(next_states)
# Monte-Carlo-Returns über N=10 (Paper Eq. (12) summiert bis Ende – hier episodisch je Env)
# Wir zerlegen 100 Schritte je Env in 10er-Blöcke (Trajektorien à N=10)
P = C.PAR_ENVS
S = steps_per_iter
T_block = C.N_STEPS # 10
# reshape arrays: [S, P] -> [S, P]
rewards = np.stack(batch_rewards, axis=0) # [S, P]
values = np.stack(batch_values, axis=0) # [S, P]
logphi = np.stack(batch_logphi, axis=0) # [S, P]
xs = np.stack(batch_actions_x, axis=0) # [S, P, K]
# Returns in 10er-Fenstern
returns_list = []
adv_list = []
logphi_list = []
obs_list = []
for start in range(0, S, T_block):
end = min(S, start + T_block)
G = rewards[start:end] # [T_block, P]
# MC-Return pro Schritt n: sum_{l=n}^{end-1} r_l (wie Eq. (12))
# Implementiere rückwärts kumulative Summe
ret = np.zeros_like(G)
running = np.zeros((P,), dtype=np.float32)
for t in reversed(range(G.shape[0])):
running += G[t]
ret[t] = running
# Values und Advantages
V = values[start:end]
A = ret - V
# Speichern
returns_list.append(ret) # [T_block, P]
adv_list.append(A)
logphi_list.append(logphi[start:end])
obs_list.append(np.stack(batch_obs[start:end], axis=0)) # [T_block, P, obs_dim]
returns_arr = np.concatenate(returns_list, axis=0) # [S, P]
adv_arr = np.concatenate(adv_list, axis=0)
logphi_arr = np.concatenate(logphi_list, axis=0)
obs_arr = np.concatenate(obs_list, axis=0) # [S, P, obs_dim]
# Tensoren bauen
obs_tensor = torch.from_numpy(obs_arr.reshape(-1, obs_dim)).to(device=device, dtype=torch.float32)
returns_tensor = torch.from_numpy(returns_arr.reshape(-1)).to(device=device, dtype=torch.float32)
adv_tensor = torch.from_numpy(adv_arr.reshape(-1)).to(device=device, dtype=torch.float32)
logphi_tensor = torch.from_numpy(logphi_arr.reshape(-1)).to(device=device, dtype=torch.float32)
# Value neu vorwärts (für Loss) – (14)
v_pred = ac.forward_value(obs_tensor).squeeze(-1)
loss_v = ac.value_loss(v_pred.unsqueeze(-1), returns_tensor)
# Policy-Loss – (13)
loss_pi = ac.policy_loss(logphi_tensor, adv_tensor)
loss = loss_pi + loss_v
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Logging
mean_ret = returns_arr[0].mean() # grober Indikator
sm = returns_smooth.update(float(mean_ret))
if it % 10 == 0 or it == 1:
print(f"[{it:03d}/{C.H_ITERS}] sigma={sigma_i:.3f} loss={loss.item():.4f} "
f"ret(mean-first-step)={mean_ret:.3f} smooth={sm:.3f}")
# Ende
return {"smooth_return": float(returns_smooth.value)}
if __name__ == "__main__":
# Beispiel: Training im 'tactical'-Markt mit M0=60
train_ln(market="tactical", M0=60)