-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcross_entropy.py
255 lines (188 loc) · 9.06 KB
/
cross_entropy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import gym
import gym.spaces
import numpy as np
from gym.utils import play
def check_policy_init(initial_policy):
assert type(initial_policy) in (np.ndarray, np.matrix)
assert np.allclose(initial_policy, 1. / n_actions)
assert np.allclose(np.sum(initial_policy, axis=1), 1)
print('Policy initialization: Ok!')
def check_generate_session_func(generation_func):
s, a, r = generation_func(policy)
assert type(s) == type(a) == list
assert len(s) == len(a)
assert type(r) in [float, np.float]
print('Session generation function: Ok!')
def check_update_policy_func(update_func):
elite_states, elite_actions = ([1, 2, 3, 4, 2, 0, 2, 3, 1], [0, 2, 4, 3, 2, 0, 1, 3, 3])
new_policy = update_func(elite_states, elite_actions, 5, 6)
assert np.isfinite(new_policy).all(), 'Your new policy contains NaNs or +-inf. Make sure you do not divide by zero.'
assert np.all(new_policy >= 0), 'Your new policy should not have negative action probabilities'
assert np.allclose(new_policy.sum(axis=-1), 1), \
'Your new policy should be a valid probability distribution over actions'
reference_answer = np.array([
[1., 0., 0., 0., 0.],
[0.5, 0., 0., 0.5, 0.],
[0., 0.33333333, 0.66666667, 0., 0.],
[0., 0., 0., 0.5, 0.5]])
assert np.allclose(new_policy[:4, :5], reference_answer)
print('Update policy function: Ok!')
def check_select_elites_func(select_elite_func):
states_batch = [[1, 2, 3], [4, 2, 0, 2], [3, 1]]
actions_batch = [[0, 2, 4], [3, 2, 0, 1], [3, 3]]
rewards_batch = [3, 4, 5]
test_result_0 = select_elite_func(states_batch, actions_batch, rewards_batch, percentile=0)
test_result_40 = select_elite_func(states_batch, actions_batch, rewards_batch, percentile=30)
test_result_90 = select_elite_func(states_batch, actions_batch, rewards_batch, percentile=90)
test_result_100 = select_elite_func(states_batch, actions_batch, rewards_batch, percentile=100)
assert np.all(test_result_0[0] == [1, 2, 3, 4, 2, 0, 2, 3, 1]) and \
np.all(test_result_0[1] == [0, 2, 4, 3, 2, 0, 1, 3, 3]), \
'For percentile 0 you should return all states and actions in chronological order'
assert np.all(test_result_40[0] == [4, 2, 0, 2, 3, 1]) and \
np.all(test_result_40[1] == [3, 2, 0, 1, 3, 3]), \
'For percentile 30 you should only select states/actions from two first'
assert np.all(test_result_90[0] == [3, 1]) and \
np.all(test_result_90[1] == [3, 3]), \
'For percentile 90 you should only select states/actions from one game'
assert np.all(test_result_100[0] == [3, 1]) and \
np.all(test_result_100[1] == [3, 3]), \
'Please make sure you use >=, not >. Also double-check how you compute percentile.'
print('Select elites function : Ok!')
def generate_session(policy, t_max=10**5):
"""
Play game until end or for t_max ticks.
:param policy: an array of shape [n_states,n_actions] with action probabilities
:returns: list of states, list of actions and sum of rewards
"""
states, actions = [], []
total_reward = 0.
s = env.reset()
for t in range(t_max):
# Choose action from policy
# You can use np.random.choice() func
# a = ?
a = np.random.choice(6, p=policy[s])
# Do action `a` to obtain new_state, reward, is_done,
new_s, r, is_done, _ = env.step(a)
# Record state, action and add up reward to states, actions and total_reward accordingly.
states.append(s)
actions.append(a)
total_reward += r
# Update s for new cycle iteration
s = new_s
if is_done:
break
return states, actions, total_reward
def select_elites(states_batch, actions_batch, rewards_batch, percentile=80):
"""
Select states and actions from games that have rewards >= percentile
:param states_batch: list of lists of states, states_batch[session_i][t]
:param actions_batch: list of lists of actions, actions_batch[session_i][t]
:param rewards_batch: list of rewards, rewards_batch[session_i]
:returns: elite_states,elite_actions, both 1D lists of states and respective actions from elite sessions
Please return elite states and actions in their original order
[i.e. sorted by session number and timestep within session]
If you're confused, see examples below. Please don't assume that states are integers (they'll get different later).
"""
states_batch, actions_batch, rewards_batch = map(np.array, [states_batch, actions_batch, rewards_batch])
# Compute reward threshold
reward_threshold = np.percentile(rewards_batch, percentile)
# Compute elite states using reward threshold
elite_states = states_batch[rewards_batch >= reward_threshold]
# Compute elite actions using reward threshold
elite_actions = actions_batch[rewards_batch >= reward_threshold]
elite_states, elite_actions = map(np.concatenate, [elite_states, elite_actions])
return elite_states, elite_actions
def update_policy(elite_states, elite_actions, n_states, n_actions):
"""
Given old policy and a list of elite states/actions from select_elites,
return new updated policy where each action probability is proportional to
policy[s_i,a_i] ~ #[occurences of si and ai in elite states/actions]
Don't forget to normalize policy to get valid probabilities and handle 0/0 case.
In case you never visited a state, set probabilities for all actions to 1./n_actions
:param elite_states: 1D list of states from elite sessions
:param elite_actions: 1D list of actions from elite sessions
"""
new_policy = np.zeros([n_states, n_actions])
# Compute updated policy
for i, j in zip(elite_states, elite_actions):
new_policy[i, j] += 1
sums = np.sum(new_policy, axis=1)
for i in range(n_states):
if sums[i] != 0:
new_policy[i] = new_policy[i] / sums[i]
else:
new_policy[i] = np.ones(n_actions) / n_actions
return new_policy
def rl_cross_entropy():
# Useful constants, all should be applied somewhere in your code
n_sessions = 200 # generate n_sessions for analysis
percentile = 50 # take this percentage of 'elite' states/actions
alpha = 0.3 # alpha-blending for policy updates
total_iterations = 100
visualize = True
log = []
# Create random uniform policy
policy = np.ones((n_states, n_actions))/n_actions
check_policy_init(policy)
if visualize:
import matplotlib.pyplot as plt
plt.figure(figsize=[10, 4])
for i in range(total_iterations):
percentile = i
alpha = 1 / (100 - i+1)
# -143
# Generate n_sessions for further analysis.
sessions = [generate_session(policy) for _ in range(n_sessions)]
states_batch, actions_batch, rewards_batch = zip(*sessions)
# Select elite states & actions.
elite_states, elite_actions = select_elites(states_batch, actions_batch, rewards_batch)
# Update policy using elite_states, elite_actions.
new_policy = update_policy(elite_states, elite_actions, n_states, n_actions)
# Alpha blending of old & new policies for stability.
policy = alpha * new_policy + (1 - alpha) * policy
# Info for debugging
mean_reward = np.mean(rewards_batch)
threshold = np.percentile(rewards_batch, percentile)
log.append([mean_reward, threshold])
print('Iteration = %.0f, Mean Reward = %.3f, Threshold = %.3f' % (i, mean_reward, threshold))
# Visualize training
if visualize:
plt.subplot(1, 2, 1)
plt.plot(list(zip(*log))[0], label='Mean rewards', color='red')
plt.plot(list(zip(*log))[1], label='Reward thresholds', color='green')
if i == 0:
plt.legend()
plt.grid()
plt.subplot(1, 2, 2)
plt.hist(rewards_batch, range=[-990, +10], color='blue', label='Rewards distribution')
plt.vlines([np.percentile(rewards_batch, percentile)], [0], [100], label='Percentile', color='red')
plt.legend()
plt.grid()
plt.pause(0.1)
plt.cla()
if __name__ == '__main__':
# Create environment 'Taxi-v2'a
env = gym.make('Breakout-v0')
# env.
play.play(env, zoom=3, fps=10)
env.reset()
env.render()
# Compute number of states for this environment
n_states = env.env.nS
# Compute number of actions for this environment
n_actions = env.env.nA
print('States number = %i, Actions numwswsber = %i' % (n_states, n_actions))
# Initialize policy - let's say random uniform
policy = np.ones((500, 6))/6
check_policy_init(policy)
# Complete generate session function
check_generate_session_func(generate_session)
# Complete select elites function
check_select_elites_func(select_elites)
# Complete update policy function
check_update_policy_func(update_policy)
# Complete rl_cross_entropy()
rl_cross_entropy()
# Close environment when everything is done
env.close()