1+ import ctypes
2+ import os
3+ import sys
4+ import random
5+ import pygame
6+ import subprocess
7+ import platform
8+ from sim .simobjects import *
9+
10+ def main (control , sim_length , frequency , dmin , dmax , render ):
11+ global screen , clock , reverse , lanes , intersection , cars , intersection , total_wait , count , trial_reward , \
12+ LANE_WIDTH , SPEED , SCREEN_SIZE
13+
14+ def approx (p1 , p2 ):
15+ if abs (p1 [0 ] - p2 [0 ]) < SPEED // 2 :
16+ if abs (p1 [1 ] - p2 [1 ]) < SPEED // 2 :
17+ return True
18+ return False
19+
20+ def rotate (direction , turn ):
21+ return {
22+ 'right' : {
23+ 'up' : 'right' ,
24+ 'right' : 'down' ,
25+ 'down' : 'left' ,
26+ 'left' : 'up'
27+ },
28+ 'left' : {
29+ 'up' : 'left' ,
30+ 'left' : 'down' ,
31+ 'down' : 'right' ,
32+ 'right' : 'up'
33+ }
34+ } [turn ] [direction ]
35+
36+
37+ def get_turn (lane , turn ):
38+ return {
39+ 0 : {
40+ 'right' : 0 ,
41+ 'left' : 2
42+ },
43+ 2 : {
44+ 'right' : 1 ,
45+ 'left' : 0
46+ },
47+ 4 : {
48+ 'right' : 3 ,
49+ 'left' : 1
50+ },
51+ 6 : {
52+ 'right' : 2 ,
53+ 'left' : 3
54+ }
55+ } [lane ] [turn ]
56+
57+ def get_reward ():
58+ result = 0
59+ for l in lanes :
60+ for c in l .cars :
61+ if c .speed == 0 :
62+ result += 1
63+ return (1 / (result + 1e-4 ))
64+
65+ i = 6
66+ l = lanes [i ]
67+ car = Car (l .start , l .direction , SPEED , random .choice (['straight' ,'right' ,'left' ]), screen )
68+ car .start = i
69+ cars .add (car )
70+ #total_wait = 0
71+
72+
73+ for count in range (sim_length ):
74+ for event in pygame .event .get ():
75+ if event .type == pygame .QUIT :
76+ pygame .quit ()
77+ quit ()
78+
79+ elif event .type == pygame .KEYDOWN :
80+ if event .key == pygame .K_q :
81+ pygame .quit ()
82+ quit ()
83+ elif event .key == 27 :
84+ pygame .quit ()
85+ quit ()
86+
87+ if count % int (200 / SPEED ) == 0 :
88+ i = random .choice (range (0 , 7 , 2 ))
89+ l = lanes [i ]
90+ g = random .choice (['straight' , 'right' ])
91+ c = Car (l .start , l .direction , SPEED , g , screen )
92+ c .start = i
93+ cars .add (c )
94+
95+ for c in cars .sprites ():
96+
97+ c .speed = SPEED
98+
99+ if c .rect .colliderect (middle .rect ):
100+ if not middle .rect .contains (c .rect ):
101+ if middle .flow != c .orientation and c .rect .colliderect (lanes [c .start ]):
102+ c .speed = 0
103+ if reverse (c .orientation ) in [d .orientation for d in middle .incars ] and c .rect .colliderect (lanes [c .start ]):
104+ c .speed = 0
105+ else :
106+ if c .goal != 'straight' :
107+ t = middle .turns [get_turn (c .start , c .goal )]
108+ if approx (c .rect .center , t ) and not c .turned :
109+ c .direction = rotate (c .direction , c .goal )
110+ c .turned = True
111+
112+ if c .speed == 0 :
113+ total_wait += 1
114+
115+ control (frequency , dmin , dmax , total_wait )
116+ trial_reward += get_reward ()
117+
118+ intersection .update (cars .sprites ())
119+ cars .update (intersection .sprites ())
120+
121+ screen .fill (YELLOW )
122+
123+ intersection .draw (screen )
124+ cars .draw (screen )
125+
126+ if render : pygame .display .update ()
127+
128+ clock .tick (180 )
129+
130+ count += 1
131+
132+ ''' Get information on the screen the program is running on '''
133+ def get_screen_metrics ():
134+ if platform .system () == 'Windows' :
135+ user32 = ctypes .windll .user32
136+ SCREEN_SIZE = user32 .GetSystemMetrics (0 ), user32 .GetSystemMetrics (1 )
137+ COMBINED_SCREEN_SIZE = user32 .GetSystemMetrics (78 ), user32 .GetSystemMetrics (79 )
138+ SECOND_SCREEN_SIZE = (COMBINED_SCREEN_SIZE [0 ] - SCREEN_SIZE [0 ], COMBINED_SCREEN_SIZE [1 ])
139+ DISPLAY_MODE = "single" if COMBINED_SCREEN_SIZE == SCREEN_SIZE else "dual"
140+ RATIO = 1.0
141+ if DISPLAY_MODE == "dual" :
142+ DISPLAY_WIDTH , DISPLAY_HEIGHT = tuple ([int (i // RATIO ) for i in list (SECOND_SCREEN_SIZE )])
143+ x , y = ((COMBINED_SCREEN_SIZE [0 ] + SCREEN_SIZE [0 ] - DISPLAY_WIDTH ) // 2 , (COMBINED_SCREEN_SIZE [1 ] - DISPLAY_HEIGHT ) // 2 )
144+ else :
145+ DISPLAY_WIDTH , DISPLAY_HEIGHT = tuple ([int (i // RATIO ) for i in list (SCREEN_SIZE )])
146+ x , y = (SCREEN_SIZE [0 ] - DISPLAY_WIDTH ) // 2 , (SCREEN_SIZE [1 ] - DISPLAY_HEIGHT ) // 2
147+ return x , y , DISPLAY_WIDTH , DISPLAY_HEIGHT
148+ elif platform .system () == 'Linux' :
149+ output = subprocess .Popen ('xrandr | grep "\*" | cut -d" " -f4' ,shell = True , stdout = subprocess .PIPE ).communicate ()[0 ]
150+ resolution = [int (i ) for i in output .split ()[0 ].split (b'x' )]
151+ return resolution [0 ] // 2 , 0 , resolution [0 ], resolution [1 ]
152+ return 683 , 0 , 1366 , 768
153+
154+ if __name__ == '__main__' :
155+
156+ def actuated (frequency , dmin , dmax , * args ):
157+ ''' Semi-actuated traffic control '''
158+ global duration
159+ def is_empty (lanes ):
160+ for l in lanes :
161+ for c in l .cars :
162+ position = [type (p ) for p in c .position ]
163+ if Lane in position and Middle in position :
164+ return False
165+ return True
166+ hlanes = [lanes [2 ], lanes [6 ]]
167+ vlanes = [lanes [0 ], lanes [4 ]]
168+ if count % frequency == 0 :
169+ switch = is_empty (hlanes ) if middle .flow == 'horizontal' else is_empty (vlanes )
170+ if (switch and duration > dmin and duration < dmax ) or (duration > dmax ):
171+ middle .flow = reverse (middle .flow )
172+ duration = 0
173+ else :
174+ duration += 1
175+
176+ reverse = lambda flow : {'horizontal' : 'vertical' }.get (flow , 'horizontal' )
177+
178+ x , y , DISPLAY_WIDTH , DISPLAY_HEIGHT = get_screen_metrics ()
179+ os .environ ['SDL_VIDEO_WINDOW_POS' ] = "%d,%d" % (x ,y )
180+
181+ BLACK = (0 , 0 , 0 )
182+ WHITE = (255 , 255 , 255 )
183+ RED = (255 , 0 , 0 )
184+ GREEN = (0 , 255 , 0 )
185+ BLUE = (0 , 0 , 255 )
186+ YELLOW = (255 , 255 , 0 )
187+
188+ LANE_WIDTH = int (DISPLAY_WIDTH * 0.1 )
189+ VERT_LANE_LENGTH = DISPLAY_HEIGHT // 2 - LANE_WIDTH
190+ HORZ_LANE_LENGTH = (DISPLAY_WIDTH // 2 - LANE_WIDTH )
191+
192+ CENTER = (DISPLAY_WIDTH // 2 , DISPLAY_HEIGHT // 2 )
193+
194+ SPEED = 16
195+ TRIALS = 5
196+ SIM_LENGTH = 500
197+
198+ ACTIONS = ['horizontal' , 'vertical' ]
199+ MAX_SIZE = 15
200+
201+ RENDER = True
202+
203+ frequencies = range (30 , 100 , 10 )
204+ dmins = range (10 , 100 , 10 )
205+ dmaxs = range (100 , 250 , 10 )
206+ log_file = 'logs\\ actuated.txt'
207+
208+ pygame .init ()
209+ screen = pygame .display .set_mode ((DISPLAY_WIDTH , DISPLAY_HEIGHT ))
210+ pygame .display .set_caption ('Simulation' )
211+
212+
213+
214+ # Iterate through each combination of the chosen frequencies, dmin's, and dmax's
215+ for frequency in frequencies :
216+ for dmin in dmins :
217+ for dmax in dmaxs :
218+ rewards = []
219+ waits = []
220+
221+ count = 0
222+ duration = 0
223+
224+ for i in range (TRIALS ):
225+
226+ trial_reward = 0
227+ total_wait = 0
228+
229+ clock = pygame .time .Clock ()
230+ intersection = pygame .sprite .Group ()
231+ cars = pygame .sprite .Group ()
232+ middle = Middle (LANE_WIDTH * 2 , LANE_WIDTH * 2 , CENTER , screen )
233+ intersection .add (middle )
234+ lanes = []
235+ lanes .append (Lane (LANE_WIDTH , VERT_LANE_LENGTH , 'down' , ((DISPLAY_WIDTH - LANE_WIDTH ) // 2 , (DISPLAY_HEIGHT - VERT_LANE_LENGTH ) // 2 - LANE_WIDTH ), screen ))
236+ lanes .append (Lane (LANE_WIDTH , VERT_LANE_LENGTH , 'up' , ((DISPLAY_WIDTH + LANE_WIDTH ) // 2 , (DISPLAY_HEIGHT - VERT_LANE_LENGTH ) // 2 - LANE_WIDTH ), screen ))
237+ lanes .append (Lane (LANE_WIDTH , HORZ_LANE_LENGTH , 'left' , ((DISPLAY_WIDTH + HORZ_LANE_LENGTH ) // 2 + LANE_WIDTH , (DISPLAY_HEIGHT - LANE_WIDTH ) // 2 ), screen ))
238+ lanes .append (Lane (LANE_WIDTH , HORZ_LANE_LENGTH , 'right' , ((DISPLAY_WIDTH + HORZ_LANE_LENGTH ) // 2 + LANE_WIDTH , (DISPLAY_HEIGHT + LANE_WIDTH ) // 2 ), screen ))
239+ lanes .append (Lane (LANE_WIDTH , VERT_LANE_LENGTH , 'up' , ((DISPLAY_WIDTH + LANE_WIDTH ) // 2 , (DISPLAY_HEIGHT + VERT_LANE_LENGTH ) // 2 + LANE_WIDTH ), screen ))
240+ lanes .append (Lane (LANE_WIDTH , VERT_LANE_LENGTH , 'down' , ((DISPLAY_WIDTH - LANE_WIDTH ) // 2 , (DISPLAY_HEIGHT + VERT_LANE_LENGTH ) // 2 + LANE_WIDTH ), screen ))
241+ lanes .append (Lane (LANE_WIDTH , HORZ_LANE_LENGTH , 'right' , ((DISPLAY_WIDTH // 2 - LANE_WIDTH ) // 2 , (DISPLAY_HEIGHT + LANE_WIDTH ) // 2 ), screen ))
242+ lanes .append (Lane (LANE_WIDTH , HORZ_LANE_LENGTH , 'left' , ((DISPLAY_WIDTH // 2 - LANE_WIDTH ) // 2 , (DISPLAY_HEIGHT - LANE_WIDTH ) // 2 ), screen ))
243+ intersection .add (* lanes )
244+
245+ main (actuated , SIM_LENGTH , frequency , dmin , dmax , RENDER )
246+ rewards .append (trial_reward )
247+ waits .append (total_wait )
248+
249+ with open (log_file , 'a' ) as log :
250+ log .write (f'Frequency { frequency } ; DMin: { dmin } ; DMax: { dmax } \n ' )
251+ log .write (str (min (rewards )) + '\n ' )
252+ log .write (str (sum (rewards )/ TRIALS ) + '\n ' )
253+ log .write (str (max (rewards )) + '\n ' )
254+ log .write (str (sum (waits )/ TRIALS ) + '\n ' )
255+
256+
257+
258+ pygame .quit ()
259+ quit ()
0 commit comments