-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmc_control.py
More file actions
216 lines (176 loc) · 7.74 KB
/
mc_control.py
File metadata and controls
216 lines (176 loc) · 7.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# mc_control.py
import numpy as np
import config
class MonteCarloControl:
# First-visit MC control for Q(s,a)
# epsilon = greedy probability, increasing over training
def __init__(
self,
env,
gamma=None,
epsilon=None,
seed=None,
epsilon_start: float = 0.2,
epsilon_end: float = 0.99,
epsilon_warmup_ratio: float = 0.8,
final_greedy_ratio: float = 0.1,
):
# Store environment and basic dimensions
self.env = env
self.n_states = int(env.n_states)
self.n_actions = int(env.n_actions)
# Discount factor
self.gamma = float(config.GAMMA if gamma is None else gamma)
# If explicitly passes epsilon, treat it as epsilon_end
if epsilon is not None:
epsilon_end = float(epsilon)
# Epsilon schedule parameters
self.epsilon_start = float(epsilon_start)
self.epsilon_end = float(epsilon_end)
# Increase epsilon over warmup; then hold; final stage uses epsilon=1.0
self.epsilon_warmup_ratio = float(epsilon_warmup_ratio)
self.final_greedy_ratio = float(final_greedy_ratio)
# Current epsilon value used by epsilon_greedy
self.epsilon = float(self.epsilon_start)
# RNG for tie-breaking + exploration
self.rng = np.random.default_rng(seed)
# Q-table and first-visit returns accumulators
self.Q = np.zeros((self.n_states, self.n_actions), dtype=float)
self.returns_sum = np.zeros((self.n_states, self.n_actions), dtype=float)
self.returns_cnt = np.zeros((self.n_states, self.n_actions), dtype=int)
def _default_num_episodes(self) -> int:
# Choose default number of episodes based on grid size
if int(self.env.grid_size) == 4:
return int(config.NUM_EPISODES_4)
return int(config.NUM_EPISODES_10)
def _default_max_steps(self) -> int:
# Choose default max steps per episode based on grid size
if int(self.env.grid_size) == 4:
return int(config.MAX_STEPS_4)
return int(config.MAX_STEPS_10)
def _update_epsilon(self, ep_idx: int, num_episodes: int) -> None:
# Piecewise schedule: warmup increase -> hold -> final pure greedy
# epsilon here is the probability of selecting a greedy action
if num_episodes <= 1:
self.epsilon = 1.0
return
final_start = int((1.0 - self.final_greedy_ratio) * num_episodes)
if ep_idx >= final_start:
self.epsilon = 1.0
return
warmup_episodes = max(1, int(self.epsilon_warmup_ratio * final_start))
if ep_idx >= warmup_episodes:
self.epsilon = float(self.epsilon_end)
return
frac = ep_idx / float(warmup_episodes)
self.epsilon = float(self.epsilon_start + frac * (self.epsilon_end - self.epsilon_start))
def greedy_action(self, state: int) -> int:
# Random tie-breaking among max-Q actions
q = self.Q[state]
m = np.max(q)
candidates = np.flatnonzero(q == m)
return int(self.rng.choice(candidates))
def epsilon_greedy(self, state: int) -> int:
# epsilon = greedy probability
if self.rng.random() < self.epsilon:
return self.greedy_action(state)
return int(self.rng.integers(0, self.n_actions))
def optimal_action(self, state: int) -> int:
# Action used during testing
return self.greedy_action(state)
def generate_episode(self, max_steps: int):
# Generate one episode using the current behavior policy
episode = []
s = int(self.env.reset())
for _ in range(int(max_steps)):
a = int(self.epsilon_greedy(s))
res = self.env.step(a)
r = float(res.reward)
episode.append((s, a, r))
s = int(res.next_state)
if res.done:
break
return episode
def train(self, num_episodes=None, max_steps=None, window=None):
# Train MC control using first-visit returns
num_episodes = int(self._default_num_episodes() if num_episodes is None else num_episodes)
max_steps = int(self._default_max_steps() if max_steps is None else max_steps)
window = int(config.ACCURACY_WINDOW if window is None else window)
# Logs for plotting
steps = []
accuracy = []
avg_rewards = []
# Sliding-window success counting (goal reached)
goal_count = 0
total_reward = 0.0
for ep in range(1, num_episodes + 1):
# Update epsilon according to the schedule
self._update_epsilon(ep_idx=ep - 1, num_episodes=num_episodes)
# Rollout one episode
episode = self.generate_episode(max_steps)
steps.append(len(episode))
# Episodic return and running mean
ep_return = float(sum(r for (_, _, r) in episode))
total_reward += ep_return
avg_rewards.append(total_reward / ep)
# Count success if the episode terminates at the goal
if len(episode) > 0 and episode[-1][2] == float(config.REWARD_GOAL):
goal_count += 1
# First-visit MC update computed backward
G = 0.0
visited = set()
for t in range(len(episode) - 1, -1, -1):
s, a, r = episode[t]
G = r + self.gamma * G
if (s, a) in visited:
continue
visited.add((s, a))
self.returns_sum[s, a] += G
self.returns_cnt[s, a] += 1
self.Q[s, a] = self.returns_sum[s, a] / self.returns_cnt[s, a]
# Record success rate over a fixed window
if ep % window == 0:
accuracy.append(goal_count / window)
goal_count = 0
return {"Q": self.Q, "steps": steps, "accuracy": accuracy, "avg_return": avg_rewards}
def test(self, num_test=None, max_steps=None):
# Evaluate learned Q-table without exploration noise
num_test = int(config.NUM_TEST if num_test is None else num_test)
max_steps = int(self._default_max_steps() if max_steps is None else max_steps)
test_steps = []
test_success = []
test_returns = []
for _ in range(num_test):
# Fixed-start evaluation episode
s = int(self.env.reset())
ep_return = 0.0
done_flag = False
for t in range(int(max_steps)):
# Select action from the learned greedy policy
a = int(self.optimal_action(s))
res = self.env.step(a)
s = int(res.next_state)
ep_return += float(res.reward)
if res.done:
# Record terminal episode length and success
test_steps.append(t + 1)
test_success.append(1 if float(res.reward) == float(config.REWARD_GOAL) else 0)
test_returns.append(ep_return)
done_flag = True
break
# If no terminal state was reached, mark as failure at max_steps
if not done_flag:
test_steps.append(int(max_steps))
test_success.append(0)
test_returns.append(ep_return)
return {
"success_rate": float(np.mean(test_success)),
"avg_steps": float(np.mean(test_steps)),
"std_steps": float(np.std(test_steps)),
"var_steps": float(np.var(test_steps)),
"avg_return": float(np.mean(test_returns)),
"var_return": float(np.var(test_returns)),
"test_steps": test_steps,
"test_success": test_success,
"test_returns": test_returns,
}