1+ #####################################################################
2+ # Course: CS 499/ Introduction to Intelligent Decision Making
3+ # Description: Skeleton code for Q-learning
4+ #####################################################################
5+ import numpy as np
6+ import sys
7+ import timeit
8+ import random
9+
10+ class Taxi_Grid ():
11+ def __init__ (self ):
12+ self .grid_width = 4
13+ self .grid_height = 4
14+ self .actions = ['left' ,'up' ,'right' ,'down' ]
15+ self .traffic_state_id = [2 ,5 ,6 ,9 ]
16+ self .gamma = 0.99
17+ self .policy = {}
18+ self .states = []
19+ self .populateParam ()
20+
21+
22+ # Reward and transition function
23+ def populateParam (self ):
24+ for s in range (16 ):
25+ self .states .append (s )
26+ self .s0 = 0
27+ self .goal_id = 11
28+ self .R = np .full ((len (self .states )), - 1 )
29+ self .R [self .goal_id ] = 100.0
30+ for s in self .traffic_state_id :
31+ self .R [s ] = - 10.0
32+ self .P = [ [None ]* len (self .actions ) for i in range (len (self .states )) ]
33+ self .Q = [ [0 ]* len (self .actions ) for i in range (len (self .states )) ]
34+
35+ for s in self .states :
36+ for a , action in enumerate (self .actions ):
37+ self .P [s ][a ] = self .getSucc (s , action )
38+
39+ def getSucc (self ,s ,action ):
40+ succ_prob = 0.8
41+ fail_prob = 0.2
42+ succ = []
43+ if action == "left" :
44+ if s % self .grid_width > 0 :
45+ succ .append ((s - 1 , succ_prob ))
46+ if s > self .grid_width - 1 : #slides up when its action fails
47+ succ .append ((s - self .grid_width ,fail_prob ))
48+ elif s <= self .grid_width and s >= 0 : #slides down if it is the first row
49+ succ .append ((s + self .grid_width , fail_prob ))
50+ return succ
51+ else :
52+ return None
53+ elif action == "up" :
54+ if s >= self .grid_width :
55+ succ .append ((s - self .grid_width ,succ_prob ))
56+
57+ if s % self .grid_width < 3 : #not right-most cell, slides right when action fails
58+ succ .append ((s + 1 , fail_prob ))
59+ else :
60+ succ .append ((s - 1 , fail_prob ))
61+ return succ
62+ else :
63+ return None
64+ elif action == "right" :
65+ if s % self .grid_width < 3 :
66+ succ .append ((s + 1 , succ_prob ))
67+ if (s + self .grid_width ) in self .states : #not the last row, slides down when action fails
68+ succ .append (( s + self .grid_width , fail_prob ))
69+ elif s >= self .grid_width : #moves up instead
70+ succ .append (( s - self .grid_width , fail_prob ))
71+ return succ
72+ else :
73+ return None
74+ elif action == "down" :
75+ if (s + self .grid_width ) in self .states : #not the last row
76+ succ .append (( s + self .grid_width , succ_prob ))
77+ if s % self .grid_width > 0 : #not the first column. Slides left when action fails
78+ succ .append ((s - 1 , fail_prob ))
79+ elif s % self .grid_width < 3 : #slides right instead
80+ succ .append ((s + 1 , fail_prob ))
81+ return succ
82+ else :
83+ return None
84+ return None
85+
86+
87+ # This function simulates the environment by selecting a successor state,
88+ # given a state and an action. Note that in RL, the agent is
89+ # unaware of the exact transitions. The agent learns by interacting with
90+ # the envirnoment. This function simulates that intereaction. This function
91+ # requires no modification.
92+
93+ def generateRandomSuccessor (self , state , action ):
94+ random_value = random .uniform (0 , 1 )
95+ prob_sum = 0
96+ if state == self .goal_id :
97+ return state
98+
99+ for succ in self .P [state ][action ]:
100+ succ_state_id = self .states .index (succ [0 ])
101+ prob = succ [1 ]
102+ prob_sum += prob
103+ if prob_sum >= random_value :
104+ return succ_state_id
105+
106+ # This function finds the best action for a state based on current Q values.
107+ # This function requires no modification.
108+
109+ def getBestAction (self ,state ):
110+ applicable_actions = []
111+ pi_s_a = np .zeros ((len (self .actions ))).astype ('float32' ).reshape (- 1 ,1 )
112+ best_Q = float ('-inf' )
113+ best_action = - 1
114+ for a , action in enumerate (self .actions ):
115+ if self .P [state ][a ]!= None :
116+ applicable_actions .append (a )
117+ if self .Q [state ][a ] > best_Q :
118+ best_Q = self .Q [state ][a ]
119+ best_action = a
120+ return best_action
121+
122+ # I added this function into this file
123+ # This function returns epsilon greedy policy based on current Q values and epsilon.
124+ # This function requires no modification.
125+
126+ def get_epsilon_greedy_action (self ,state , epsilon ):
127+ applicable_actions = []
128+ pi_s_a = np .zeros ((len (self .actions ))).astype ('float32' ).reshape (- 1 ,1 )
129+ best_Q = float ('-inf' )
130+ best_action = - 1
131+ for a , action in enumerate (self .actions ):
132+ if self .P [state ][a ]!= None :
133+ applicable_actions .append (a )
134+ if self .Q [state ][a ] > best_Q :
135+ best_Q = self .Q [state ][a ]
136+ best_action = a
137+
138+
139+ for a in applicable_actions :
140+ if a == best_action :
141+ pi_s_a [a ] = 1 - epsilon + (epsilon / len (applicable_actions ))
142+ else :
143+ pi_s_a [a ] = epsilon / len (applicable_actions )
144+
145+ random_value = random .uniform (0 , 1 )
146+
147+ prob = 0
148+ for a , action in enumerate (self .actions ):
149+ prob += pi_s_a [a ]
150+ if prob >= random_value :
151+ return a
152+
153+
154+ # Complete the following function. Specifically:
155+ # 1. Complete the missing lines of code to perform Q-learning update
156+ # 2. Calculate the run time for each episode and calculate the average runtime across runs
157+
158+ def Q_learning (self ):
159+ # I added epsilon into this file
160+ alpha = ALPHA
161+ epsilon = EPSILON
162+ num_episodes = N_EPISODES
163+
164+ times = []
165+ ret_rewards = []
166+ for i in range (num_episodes ):
167+ start = timeit .default_timer ()
168+ # print("***************************** Episode:", i)
169+ state = self .s0
170+ accumulated_reward = 0
171+ while state != self .goal_id :
172+ action = self .get_epsilon_greedy_action (state , epsilon )
173+ successor_state = self .generateRandomSuccessor (state , action )
174+ reward = self .R [state ]
175+ accumulated_reward += reward
176+
177+ best_action = self .getBestAction (successor_state )
178+ self .Q [state ][action ] += alpha * (reward + self .gamma * self .Q [successor_state ][best_action ] - self .Q [state ][action ])
179+ state = successor_state
180+ if state == self .goal_id :
181+ end = timeit .default_timer ()
182+ elapsed_time = end - start
183+ times .append (elapsed_time )
184+ # print("goal reached", state)
185+ accumulated_reward += self .R [state ]
186+ ret_rewards .append (accumulated_reward )
187+ # print("accumulated_reward = ", accumulated_reward)
188+ print (f"average time per episode (s) = { np .mean (np .array (times )): .4f} " )
189+ return ret_rewards , times
190+
191+
192+ # taxi = Taxi_Grid()
193+ # global variables for epsilon and alpha in SARSA to reference
194+ ALPHA = None
195+ EPSILON = None
196+ N_EPISODES = None
197+ import matplotlib .pyplot as plt
198+ from collections import defaultdict
199+ if __name__ == "__main__" :
200+ """
201+ The result of this block led me to believe that alpha=0.5, epsilon=0.1 leads to the best results when looking at
202+ the avg rewards for 200 episodes over 5 trials. (Eyeballed the graphs)
203+ """
204+ # part a
205+ # N_EPISODES = 200
206+ # N_TRIALS = 5
207+ # epsilon_list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
208+ # alpha_list = [0.1, 0.3, 0.5, 0.7, 0.9]
209+
210+ # episodes = np.arange(N_EPISODES)
211+
212+ # # Store all trials: (eps, alpha) → array of shape [N_TRIALS, N_EPISODES]
213+ # reward_trials = defaultdict(lambda: np.zeros((N_TRIALS, N_EPISODES)))
214+
215+ # for eps in epsilon_list:
216+ # for alpha in alpha_list:
217+ # EPSILON = eps
218+ # ALPHA = alpha
219+ # for trial in range(N_TRIALS):
220+ # taxi = Taxi_Grid()
221+ # rewards, _ = taxi.Q_learning()
222+ # reward_trials[(eps, alpha)][trial] = rewards
223+
224+ # # Now plot results
225+ # # these figures are for parameter tuning
226+ # plt.figure(figsize=(14, 9))
227+
228+ # for (eps, alpha), rewards in reward_trials.items():
229+ # mean_rewards = rewards.mean(axis=0)
230+ # std_rewards = rewards.std(axis=0)
231+ # label = f"ε={eps}, α={alpha}"
232+ # plt.plot(episodes, mean_rewards, label=label)
233+ # plt.fill_between(episodes, mean_rewards - std_rewards, mean_rewards + std_rewards, alpha=0.2)
234+
235+ # plt.xlabel("Episode")
236+ # plt.ylabel("Accumulated Reward")
237+ # plt.title("Q-Learning Learning Curves with ε, α Parameter Sweep")
238+ # plt.legend(loc='lower right', fontsize='small', ncol=2)
239+ # plt.grid(True)
240+ # plt.tight_layout()
241+ # plt.show(block=False)
242+ # reward_trials.clear()
243+
244+ # input("Press enter to exit...")
245+
246+
247+ # part b
248+ # EPSILON = 0.1
249+ # ALPHA = 0.5
250+
251+ # N_EPISODES = 100
252+ # N_TRIALS = 50
253+ # episodes = np.arange(N_EPISODES)
254+
255+ # # Array to store rewards for all trials
256+ # reward_trials = np.zeros((N_TRIALS, N_EPISODES))
257+
258+ # for trial in range(N_TRIALS):
259+ # taxi = Taxi_Grid()
260+ # rewards, _ = taxi.Q_learning() # should return list of 100 episode rewards
261+ # reward_trials[trial] = rewards
262+
263+ # # Compute mean and std dev over trials, per episode
264+ # mean_rewards = reward_trials.mean(axis=0)
265+ # std_rewards = reward_trials.std(axis=0)
266+
267+ # # Plot the learning curve with shaded std deviation
268+ # plt.figure(figsize=(14, 9))
269+ # label = f"ε={EPSILON}, α={ALPHA}"
270+ # plt.plot(episodes, mean_rewards, label=label)
271+ # plt.fill_between(episodes, mean_rewards - std_rewards, mean_rewards + std_rewards, alpha=0.2)
272+
273+ # plt.xlabel("Episode")
274+ # plt.ylabel("Accumulated Reward")
275+ # plt.title("Q-Learning: Mean Reward over 50 Trials")
276+ # plt.legend(loc='lower right', fontsize='small')
277+ # plt.grid(True)
278+ # plt.tight_layout()
279+ # plt.show()
280+
281+
282+ # part c
283+ EPSILON = 0.95
284+ ALPHA = 0.1
285+
286+ N_EPISODES = 200
287+ N_TRIALS = 50
288+ episodes = np .arange (N_EPISODES )
289+
290+ # Array to store rewards for all trials
291+ reward_trials = np .zeros ((N_TRIALS , N_EPISODES ))
292+
293+ for trial in range (N_TRIALS ):
294+ taxi = Taxi_Grid ()
295+ rewards , _ = taxi .Q_learning () # should return list of 100 episode rewards
296+ reward_trials [trial ] = rewards
297+
298+ # Compute mean and std dev over trials, per episode
299+ mean_rewards = reward_trials .mean (axis = 0 )
300+ std_rewards = reward_trials .std (axis = 0 )
301+
302+ # Plot the learning curve with shaded std deviation
303+ plt .figure (figsize = (14 , 9 ))
304+ label = f"ε={ EPSILON } , α={ ALPHA } "
305+ plt .plot (episodes , mean_rewards , label = label )
306+ plt .fill_between (episodes , mean_rewards - std_rewards , mean_rewards + std_rewards , alpha = 0.2 )
307+
308+ plt .xlabel ("Episode" )
309+ plt .ylabel ("Accumulated Reward" )
310+ plt .title ("Q-Learning: Mean Reward over 50 Trials" )
311+ plt .legend (loc = 'lower right' , fontsize = 'small' )
312+ plt .grid (True )
313+ plt .tight_layout ()
314+ plt .show ()
315+
316+
317+ # part d
318+ ALPHA = 0.1
319+ EPSILON = 0.95
320+
321+ N_EPISODES = 200
322+ N_TRIALS = 50
323+
324+ time_trials = np .zeros ((N_TRIALS , N_EPISODES ))
325+ episodes = np .arange (N_EPISODES )
326+
327+ for trial in range (N_TRIALS ):
328+ taxi = Taxi_Grid ()
329+ _ , times = taxi .Q_learning () # should return list of 100 episode rewards
330+ time_trials [trial ] = times
331+
332+ # Compute mean and std dev over trials, per episode
333+ mean_time = time_trials .mean (axis = 0 )
334+ std_time = time_trials .std (axis = 0 )
335+
336+ # Plot the learning curve with shaded std deviation
337+ plt .figure (figsize = (14 , 9 ))
338+ label = f"ε={ EPSILON } , α={ ALPHA } "
339+ plt .plot (episodes , mean_time , label = label )
340+ plt .fill_between (episodes , mean_time - std_time , mean_time + std_time , alpha = 0.2 )
341+
342+ plt .xlabel ("Episode" )
343+ plt .ylabel ("Time (s)" )
344+ plt .title ("Q-Learning: Mean time(s) for 200 Episodes over 50 Trials" )
345+ plt .legend (loc = 'lower right' , fontsize = 'small' )
346+ plt .grid (True )
347+ plt .tight_layout ()
348+ plt .show ()
0 commit comments