1
+ #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
+ """
4
+ Created on Wed Jul 10 12:10:09 2024
5
+
6
+ @author: Sasan Amini
7
+
8
+ This file contains functions to implement bus priority at signalized intersections by modifying an existing tlLogic.
9
+ The principle is to detect a bus using a E1detector, transfer from the current phase to a target phase where bus gets green until it corsses the intersection.
10
+ There are signin and signout detectors to serve this purpose.
11
+
12
+ """
13
+ import sumolib
14
+ import xml .etree .ElementTree as ET
15
+ from sumolib .net import Condition ,TLSProgram
16
+ from sumolib .sensors .inductive_loop import InductiveLoop as E1Deetcor
17
+ #%%
18
+ def phaseTranition (init_phase ,target_phase ):
19
+
20
+ """
21
+ This function generates the transition from the current phase to an ideal phase required
22
+ by the approaching bus to cross the intersection.
23
+ There are two cases:
24
+ 1- the current phase needs to be extended and terminated after bus leaves the intersection
25
+ 2- current phase must be terminated immediately and an optimal ohase that provides green f
26
+ or the incoming bus starts and ends after the bus leaves.
27
+ Binary ariables transition determines which case must be used.
28
+ Input and output are strings e.g. GGgrrrr or GGyrrrrG
29
+ """
30
+
31
+ trasition = True
32
+
33
+
34
+ s1 = list (init_phase ) # Convert the state string to a list
35
+ s2 = list (init_phase )
36
+ s3 = list (init_phase )
37
+ sr = list (init_phase )
38
+
39
+ if init_phase == target_phase :
40
+ trasition = False
41
+
42
+ if trasition :
43
+ for i in range (len (init_phase )):
44
+ if init_phase [i ] == 'G' :
45
+ s1 [i ]= 'y'
46
+ elif init_phase [i ] == 'g' :
47
+ s1 [i ]= 'y'
48
+ elif init_phase [i ] == 'r' :
49
+ s1 [i ]= 'r'
50
+ elif init_phase [i ] == 'y' :
51
+ s1 [i ] = 'r'
52
+ else :
53
+ s1 [i ] = init_phase [i ]
54
+
55
+ for i in range (len (target_phase )):
56
+ if target_phase [i ] == 'G' :
57
+ s3 [i ]= 'y'
58
+ elif target_phase [i ] == 'g' :
59
+ s3 [i ]= 'y'
60
+ elif target_phase [i ] == 'r' :
61
+ s3 [i ]= 'r'
62
+ elif target_phase [i ] == 'y' :
63
+ s3 [i ] = 'r'
64
+ else :
65
+ s3 [i ] = target_phase [i ]
66
+ sr = '' .join (['r' ]* len (s1 ))
67
+ s2 = target_phase
68
+ return ('' .join (s1 ),sr ,'' .join (s2 ),'' .join (s3 ),sr )
69
+
70
+ else :
71
+ s1 = init_phase
72
+ for i in range (len (init_phase )):
73
+ if s1 [i ] == 'G' :
74
+ s2 [i ]= 'y'
75
+ elif s1 [i ] == 'g' :
76
+ s2 [i ]= 'y'
77
+ elif s1 [i ] == 'r' :
78
+ s2 [i ]= 'r'
79
+ elif s1 [i ] == 'y' :
80
+ s2 [i ] = 'r'
81
+ else :
82
+ s2 [i ] = s1 [i ]
83
+
84
+ sr = '' .join (['r' ]* len (s1 ))
85
+ return ('' .join (s1 ),'' .join (s2 ),sr ,None ,None )
86
+
87
+ #%%
88
+
89
+ def getSignInEdge (sumo_net , edge_id , signin_distance ):
90
+
91
+ """
92
+ This function finds the edge and the position where E1Deetcor should be placedfor bus signin.
93
+ Inputs are: an edgeID that leads to a tls, and a distance from tls stopline that a bus sigin detector should be placed upstream of the corresponding tls e.g. 120m.
94
+ Outputs: the edgeId and position on that edge to define E1Detector
95
+ *Hint: currently, from the stopline we look upstream and only consider connections with straight direction!
96
+ This can cause suboptimal detector defintions!
97
+ """
98
+
99
+ edge = sumo_net .getEdge (edge_id )
100
+ length_sum = edge .getLength ()
101
+ status = False
102
+ visited_edges = set ()
103
+
104
+ while length_sum < signin_distance and not status :
105
+ up_node = edge .getFromNode ()
106
+ if up_node .getType () == 'traffic_light' :
107
+ status = True
108
+ break
109
+
110
+ down_node = edge .getToNode ()
111
+ incoming_edges = [e for e in up_node .getIncoming () if e not in visited_edges ]
112
+
113
+ if not incoming_edges :
114
+ print (f'The given distance of { signin_distance } is too long for edge: { edge_id } ! Try a shorter distance.' )
115
+ return edge .getID (),5
116
+
117
+ found_next_edge = False
118
+ for e in incoming_edges :
119
+ if e .getFromNode () != down_node :
120
+ conns = up_node .getConnections ()
121
+ for conn in conns :
122
+ if conn .getFrom () == e and conn .getTo () == edge and conn .getDirection () == "s" :
123
+ edge = e
124
+ found_next_edge = True
125
+ break
126
+ if found_next_edge :
127
+ break
128
+
129
+ if not found_next_edge :
130
+ print (f'The given distance of { signin_distance } is too long for edge: { edge_id } ! Try a shorter distance.' )
131
+ return edge .getID (),5
132
+
133
+ visited_edges .add (edge )
134
+ length_sum += edge .getLength ()
135
+
136
+ if status :
137
+ return edge .getID (), 5
138
+ else :
139
+ return edge .getID (), round (length_sum - signin_distance , 1 )
140
+ #%%
141
+
142
+ def addSigninE1 (sumo_net ,tls_id ,signin_edge_id ,edge_id ,pos ):
143
+
144
+ """
145
+ This function generates the E1Detectors needed for signing at a user-defined distance from tls and the conditions for the tll file.
146
+ Inputs are the tls_id, the edge from which the bus enters the intersection, the actual edgeId and position on that edge where a detectors must be placed (outputs of getSignInEdge())
147
+ """
148
+
149
+ dets = []
150
+ condistions = {signin_edge_id :[]}
151
+
152
+ for lane in sumo_net .getEdge (edge_id ).getLanes ():
153
+ if 'bus' in lane .getPermissions () or 'tram' in lane .getPermissions ():
154
+ dets .append (E1Detector (id = f'signin_{ tls_id } _{ signin_edge_id } _{ edge_id } _{ lane .getIndex ()} ' , lane = lane .getID (), pos = pos , vTypes = "bus tram" , freq = 180 , file = "e1.out.xml" , friendlyPos = "True" ))
155
+ condistions [signin_edge_id ].append (Condition (id = f'signin_{ tls_id } _{ edge_id } { pos } ' , value = f'6 > z:{ dets [- 1 ].id } ' ))
156
+
157
+ # Here we can define a near stopline detector, in case of queues the bus cannot cross the intersection, can be prioritized again!
158
+ ## Todo: make sure these conditions do not conflict other conditions at the tls!
159
+
160
+ # for lane in sumo_net.getEdge(signin_edge_id).getLanes():
161
+ # if 'bus' in lane.getPermissions() or 'tram' in lane.getPermissions():
162
+ # dets.append(E1Detector(id=f'signin_{tls_id}_{signin_edge_id}_{lane.getIndex()}_near', lane=lane.getID(), pos=-10, vTypes="bus tram" , freq=180, file="e1.out.xml", friendlyPos="True"))
163
+ # condistions[signin_edge_id].append(Condition(id=f'signin_{tls_id}_{edge_id}_near', value=f'2 > z:{dets[-1].id}'))
164
+
165
+ return dets ,condistions
166
+
167
+ #%%
168
+ def addSignoutE1 (sumo_net ,tls_id ):
169
+ """
170
+ This function generates the E1Detectors needed for signout from a tls and the conditions for the tll file.
171
+ These deetectors are placed on all lanes that allow bus or tram at a fixed distance of 5m downstream of the intersection.
172
+ """
173
+ dets = []
174
+ condistions = []
175
+ seen_lane = []
176
+ for item in sumo_net .getTLS (tls_id ).getLinks ().values ():
177
+ if item [0 ][1 ].getID () not in seen_lane and 'bus' in item [0 ][1 ].getPermissions () or 'tram' in item [0 ][1 ].getPermissions ():
178
+ dets .append (E1Detector (id = f'sigout_{ tls_id } _{ item [0 ][1 ].getEdge ().getID ()} _{ item [0 ][1 ].getID ()} ' , lane = item [0 ][1 ].getID (), pos = 5 , vTypes = "bus tram" , freq = 180 , file = "e1.out.xml" , friendlyPos = "True" ))
179
+ condistions .append (Condition (id = f'signout_{ tls_id } ' , value = f'5 > z:{ dets [- 1 ].id } ' ))
180
+ seen_lane .append (item [0 ][1 ].getID ())
181
+
182
+ return dets ,condistions
183
+ #%%
184
+ def findBestState (strings , indices ):
185
+ """
186
+ This is an auxiallary function for getEdgeGState()
187
+ """
188
+ best_state = None
189
+ max_g_count = - 1
190
+
191
+ for s in strings :
192
+ if all (len (s ) > idx and s [idx ] == 'G' for idx in indices ):
193
+ g_count = s .count ('G' )
194
+ if g_count > max_g_count :
195
+ max_g_count = g_count
196
+ best_state = s
197
+ return best_state
198
+
199
+ #%%
200
+ def getEdgeGState (sumo_net ,edge_id ,tls_prog_id ,from_tls_logic :False ):
201
+
202
+ """
203
+ This function finds the phase that serves bus priority for the corresponding edge at the tls.
204
+ There are two options: 1- find the best phase from the exsiting program or 2- create a new phase that turns all lanes on that edge to green using findBestState().
205
+ option 2 is very helpful when we have turning lanes with a seperate green phase, and we dont know the coming bus is going to which direction.
206
+ """
207
+
208
+ tls_id = sumo_net .getEdge (edge_id ).getTLS ().getID ()
209
+ connections_dict = sumo_net .getTLS (tls_id ).getLinks ()
210
+ tls_g_index = {edge_id :[]}
211
+ for con in connections_dict .values ():
212
+ if con [0 ][0 ].getEdge ().getID ()== edge_id :
213
+ tls_g_index [edge_id ].append (con [0 ][- 1 ])
214
+ if not from_tls_logic :
215
+ maxTlLinkIndex = max (sumo_net .getTLS (tls_id ).getLinks ().keys ())
216
+ phase_state = list ((maxTlLinkIndex + 1 )* 'r' )
217
+ for i in tls_g_index [edge_id ]:
218
+ phase_state [i ] = 'G'
219
+ ret = '' .join (phase_state )
220
+ else :
221
+ stages = []
222
+ for phase in sumo_net .getTLS (tls_id ).getPrograms ()[tls_prog_id ].getPhases ():
223
+ stages .append (phase .state )
224
+
225
+ ret = findBestState (stages ,tls_g_index [edge_id ])
226
+
227
+ return ret
228
+ #%%
229
+ def addBusPrio (sumo_net ,tls_id ,tls_prog_id ,e1pos ,tll_file_path :str ,e1det_file_path :str ,xml_output = True ):
230
+
231
+ incomingEdges = sumo_net .getTLS (tls_id ).getEdges ()
232
+ signin_conds = {}
233
+ signin_e1s = []
234
+
235
+ signoute1s , signout_conds = addSignoutE1 (sumo_net ,tls_id )
236
+ signout_cond_val = []
237
+ for item in signout_conds :
238
+ signout_cond_id = item .id
239
+ signout_cond_val .append (item .value )
240
+ signout_cond_str = (' or ' .join (signout_cond_val ))
241
+
242
+
243
+ for edge in incomingEdges :
244
+ e1edge ,pos = getSignInEdge (sumo_net ,edge .getID (),e1pos )
245
+ signin_e1 ,signin_cond = addSigninE1 (sumo_net ,tls_id ,edge .getID (),e1edge ,pos )
246
+ signin_e1s += signin_e1
247
+ for key , value in signin_cond .items ():
248
+ signin_conds [key ] = value
249
+
250
+ if xml_output :
251
+ with open (e1det_file_path ,'w' ) as file :
252
+ file .write ('<additional> \n ' )
253
+ for item in signin_e1s :
254
+ file .write (item .toXML ())
255
+ for item in signoute1s :
256
+ file .write (item .toXML ())
257
+ file .write ('</additional>' )
258
+ else :
259
+ out_det = signin_e1s + signoute1s
260
+
261
+
262
+ for pid , tls_prog in sumo_net .getTLS (tls_id ).getPrograms ().items ():
263
+ if pid == tls_prog_id :
264
+ ptype = tls_prog .getType ()
265
+ poffset = tls_prog .getOffset ()
266
+ else :
267
+ print ('The given tls program ID is not available!' )
268
+ return None
269
+
270
+ phases = {key :val for key ,val in tls_prog .getPhasesWithIndex ().items ()}
271
+ stages = {key :val for key ,val in tls_prog .getStages ().items ()}
272
+
273
+
274
+ outf = TLSProgram (id = f'{ pid } _bus_priority' , type = ptype , offset = poffset )
275
+ condition_string = "DEFAULT"
276
+
277
+ for phid , phase in tls_prog .getPhasesWithIndex ().items ():
278
+ if phid in stages .keys ():
279
+ phases [phid + 1 ].earlyTarget = condition_string
280
+
281
+ if phid == max (phases .keys ()):
282
+ outf .addPhase (duration = phase .duration , state = phase .state , minDur = phase .minDur , maxDur = phase .maxDur , next = '0' ,name = phase .name , earlyTarget = phase .earlyTarget )
283
+ else :
284
+ outf .addPhase (duration = phase .duration , state = phase .state , minDur = phase .minDur , maxDur = phase .maxDur , next = '' ,name = phase .name , earlyTarget = phase .earlyTarget )
285
+
286
+ for stage_idx ,stage_i in stages .items ():
287
+ next = [stage_idx + 1 ]
288
+
289
+ for edge in incomingEdges :
290
+ next .append (outf .numPhases ())
291
+ n = max (next )
292
+ # stage_o = getEdgeGState(sumo_net,edge.getID(),tls_prog_id,True)
293
+ # if stage_o is None:
294
+ stage_o = getEdgeGState (sumo_net ,edge .getID (),tls_prog_id ,False )
295
+ priority_phases = phaseTranition (stage_i .state ,stage_o )
296
+ cond_val = []
297
+ for item in signin_conds [edge .getID ()]:
298
+ cond_val .append (item .value )
299
+ signin_cond_id = item .id
300
+ signin_cond_value = (' or ' .join (cond_val ))
301
+ if priority_phases [4 ] is None :
302
+ outf .addPhase (state = priority_phases [0 ], duration = 10 , minDur = 5 , maxDur = 60 ,next = [n + 1 ],earlyTarget = signin_cond_id )
303
+ outf .addPhase (state = priority_phases [1 ], duration = 3 , next = [], earlyTarget = signout_cond_id )
304
+ outf .addPhase (state = priority_phases [2 ], duration = 1 , next = [' ' .join ([str (stage_idx )]+ [str (i ) for i in list (stages .keys ()) if i != stage_idx ])])
305
+
306
+ else :
307
+ outf .addPhase (state = priority_phases [0 ], duration = 3 , next = [],earlyTarget = signin_cond_id )
308
+ outf .addPhase (state = priority_phases [1 ], duration = 2 ,next = [])
309
+ outf .addPhase (state = priority_phases [2 ], duration = 10 , minDur = 5 , maxDur = 60 ,next = [n + 3 ])
310
+ outf .addPhase (state = priority_phases [3 ], duration = 3 , next = [], earlyTarget = signout_cond_id )
311
+ outf .addPhase (state = priority_phases [4 ], duration = 2 , next = [' ' .join ([str (stage_idx )]+ [str (i ) for i in list (stages .keys ()) if i != stage_idx ])])
312
+
313
+ outf .addCondition (signin_cond_id , signin_cond_value )
314
+ outf .getPhaseByIndex (stage_idx ).addNext (next [::- 1 ])
315
+ outf .addCondition (signout_cond_id , signout_cond_str )
316
+
317
+
318
+
319
+ if xml_output :
320
+ with open (tll_file_path ,'w' ) as file :
321
+ file .write ('<additional> \n ' )
322
+ file .write (outf .toXML (tls_id ))
323
+ file .write ('</additional>' )
324
+ else :
325
+ return outf ,out_det
326
+
327
+ #%%
328
+
329
+ def read_tllogic (file = './tls.tll.xml' ):
330
+ # Load the XML file
331
+ tree = ET .parse (file )
332
+ root = tree .getroot ()
333
+ tlids = []
334
+ # Iterate over each tlLogic element
335
+ for tl_logic in root .findall ('tlLogic' ):
336
+ idx = 0
337
+ pid = tl_logic .get ('programID' )
338
+ ptype = tl_logic .get ('type' )
339
+ tlid = tl_logic .get ('id' )
340
+ tlids .append (tlid )
341
+ for phase in tl_logic .findall ('phase' ):
342
+ # Extract phase attributes
343
+ idx += 1
344
+ duration = phase .get ('duration' )
345
+ state = phase .get ('state' )
346
+ min_dur = phase .get ('minDur' )
347
+ max_dur = phase .get ('maxDur' )
348
+ next_phase = phase .get ('next' )
349
+ early_target = phase .get ('earlyTarget' )
350
+
351
+ return tlids
352
+ #%%
353
+ # How to use the function on certain tls of a given tll file
354
+ tls_ids = read_tllogic ('./tls_from_pdf_sasan.tll.xml' )
355
+ net_path = './network.net.xml'
356
+ net = sumolib .net .readNet (net_path , withConnections = True ,withInternal = True ,withLatestPrograms = True )
357
+
358
+ with open ('./bus_priority.tll.xml' ,'w' ) as file_tll :
359
+ file_tll .write ('<additional> \n ' )
360
+ with open ('./bus_priority.add.xml' ,'w' ) as file_det :
361
+ file_det .write ('<additional> \n ' )
362
+ for tls in tls_ids :
363
+ tll ,det = addBusPrio (net ,tls ,'1' ,100 ,'' ,'' ,False )
364
+ file_tll .write (tll .toXML (tls ))
365
+ for d in det :
366
+ file_det .write (d .toXML ())
367
+ file_det .write ('</additional>' )
368
+ file_tll .write ('</additional>' )
0 commit comments