This repository was archived by the owner on Mar 3, 2025. It is now read-only.
forked from HSLdevcom/OpenTripPlanner
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathTimetableSnapshot.java
501 lines (448 loc) · 20.6 KB
/
TimetableSnapshot.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
package org.opentripplanner.model;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.SetMultimap;
import java.time.LocalDate;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.opentripplanner.routing.algorithm.raptoradapter.transit.mappers.TransitLayerUpdater;
import org.opentripplanner.transit.model.framework.FeedScopedId;
import org.opentripplanner.transit.model.framework.Result;
import org.opentripplanner.transit.model.network.TripPattern;
import org.opentripplanner.transit.model.site.StopLocation;
import org.opentripplanner.transit.model.timetable.TripIdAndServiceDate;
import org.opentripplanner.transit.model.timetable.TripTimes;
import org.opentripplanner.updater.spi.UpdateError;
import org.opentripplanner.updater.spi.UpdateSuccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A TimetableSnapshot holds a set of realtime-updated Timetables frozen at a moment in time. It
* can return a Timetable for any TripPattern in the public transit network considering all
* accumulated realtime updates, falling back on the scheduled Timetable if no updates have been
* applied for a given TripPattern.
* <p>
* This is a central part of managing concurrency when many routing searches may be happening, but
* realtime updates are also streaming in which change the vehicle arrival and departure times.
* Any given request will only see one unchanging TimetableSnapshot over the course of its search.
* <p>
* An instance of TimetableSnapshot first serves as a buffer to accumulate a batch of incoming
* updates on top of any already known updates to the base schedules. From time to time such a batch
* of updates is committed (like a database transaction). At this point the TimetableSnapshot is
* treated as immutable and becomes available for use by new incoming routing requests.
* <p>
* All updates to a snapshot must be completed before it is handed off to any searches. A single
* snapshot should be used for an entire search, and should remain unchanged for that duration to
* provide a consistent view not only of trips that have been boarded, but of relative arrival and
* departure times of other trips that have not necessarily been boarded.
* <p>
* A TimetableSnapshot instance may only be modified by a single thread. This makes it easier to
* reason about how the snapshot is built up and used. Write operations are applied one by one, in
* order, with no concurrent access. Read operations are then allowed concurrently by many threads
* after writing is forbidden.
* <p>
* The fact that TripPattern instances carry a reference only to their scheduled Timetable and not
* to their realtime timetable is largely due to historical path-dependence in OTP development.
* Streaming realtime support was added around 2013 as a sort of sandbox feature that was switched
* off by default. Looking up realtime timetables during routing was a fringe feature that needed
* to impose near-zero cost and avoid introducing complexity into the primary codebase. Now over
* ten years later, the principles of how this system operates are rather stable, but the
* implementation would benefit from some deduplication and cleanup. Once that is complete, looking
* up timetables on this class could conceivably be replaced with snapshotting entire views of the
* transit network. It would also be possible to make the realtime version of Timetables or
* TripTimes the primary view, and include references back to their scheduled versions.
* <p>
* Implementation note: when a snapshot is committed, the mutable state of this class is stored
* in final fields and completely initialized in the constructor. This provides an additional
* guarantee of safe-publication without synchronization.
* (see <a href="https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.5">final Field Semantics</a>)
*/
public class TimetableSnapshot {
private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshot.class);
/**
* During the construction phase of the TimetableSnapshot, before it is considered immutable and
* used in routing, this Set holds all timetables that have been modified and are waiting to be
* indexed. This field will be set to null when the TimetableSnapshot becomes read-only.
*/
private final Set<Timetable> dirtyTimetables = new HashSet<>();
/**
* For each TripPattern (sequence of stops on a particular Route) for which we have received a
* realtime update, an ordered set of timetables on different days. The key TripPatterns may
* include ones from the scheduled GTFS, as well as ones added by realtime messages and
* tracked by the TripPatternCache. <p>
* Note that the keys do not include all scheduled TripPatterns, only those for which we have at
* least one update.<p>
* The members of the SortedSet (the Timetable for a particular day) are treated as copy-on-write
* when we're updating them. If an update will modify the timetable for a particular day, that
* timetable is replicated before any modifications are applied to avoid affecting any previous
* TimetableSnapshots still in circulation which reference that same Timetable instance. <p>
* Alternative implementations: A. This could be an array indexed using the integer pattern
* indexes. B. It could be made into a flat hashtable with compound keys (TripPattern, LocalDate).
* The compound key approach better reflects the fact that there should be only one Timetable per
* TripPattern and date.
*/
private final Map<TripPattern, SortedSet<Timetable>> timetables;
/**
* For cases where the trip pattern (sequence of stops visited) has been changed by a realtime
* update, a Map associating the updated trip pattern with a compound key of the feed-scoped
* trip ID and the service date.
* TODO RT_AB: clarify if this is an index or the original source of truth.
*/
private final Map<TripIdAndServiceDate, TripPattern> realtimeAddedTripPattern;
/**
* This is an index of TripPatterns, not the primary collection. It tracks which TripPatterns
* that were updated or newly created by realtime messages contain which stops. This allows them
* to be readily found and included in API responses containing stop times at a specific stop.
* This is a SetMultimap, so that each pattern is only retained once per stop even if it's added
* more than once.
* TODO RT_AB: More general handling of all realtime indexes outside primary data structures.
*/
private final SetMultimap<StopLocation, TripPattern> patternsForStop;
/**
* Boolean value indicating that timetable snapshot is read only if true. Once it is true, it
* shouldn't be possible to change it to false anymore.
*/
private final boolean readOnly;
/**
* Boolean value indicating that this timetable snapshot contains changes compared to the state of
* the last commit if true.
*/
private boolean dirty = false;
public TimetableSnapshot() {
this(new HashMap<>(), new HashMap<>(), HashMultimap.create(), false);
}
private TimetableSnapshot(
Map<TripPattern, SortedSet<Timetable>> timetables,
Map<TripIdAndServiceDate, TripPattern> realtimeAddedTripPattern,
SetMultimap<StopLocation, TripPattern> patternsForStop,
boolean readOnly
) {
this.timetables = timetables;
this.realtimeAddedTripPattern = realtimeAddedTripPattern;
this.patternsForStop = patternsForStop;
this.readOnly = readOnly;
}
/**
* Returns an updated timetable for the specified pattern if one is available in this snapshot, or
* the originally scheduled timetable if there are no updates in this snapshot.
*/
public Timetable resolve(TripPattern pattern, LocalDate serviceDate) {
SortedSet<Timetable> sortedTimetables = timetables.get(pattern);
if (sortedTimetables != null && serviceDate != null) {
for (Timetable timetable : sortedTimetables) {
if (timetable != null && timetable.isValidFor(serviceDate)) {
return timetable;
}
}
}
return pattern.getScheduledTimetable();
}
/**
* Get the current trip pattern given a trip id and a service date, if it has been changed from
* the scheduled pattern with an update, for which the stopPattern is different.
*
* @return trip pattern created by the updater; null if trip is on the original trip pattern
*/
@Nullable
public TripPattern getRealtimeAddedTripPattern(FeedScopedId tripId, LocalDate serviceDate) {
TripIdAndServiceDate tripIdAndServiceDate = new TripIdAndServiceDate(tripId, serviceDate);
return realtimeAddedTripPattern.get(tripIdAndServiceDate);
}
/**
* @return if any trip patterns were added.
*/
public boolean hasRealtimeAddedTripPatterns() {
return !realtimeAddedTripPattern.isEmpty();
}
/**
* Update the TripTimes of one Trip in a Timetable of a TripPattern. If the Trip of the TripTimes
* does not exist yet in the Timetable, add it. This method will make a protective copy
* of the Timetable if such a copy has not already been made while building up this snapshot,
* handling both cases where patterns were pre-existing in static data or created by realtime data.
*
* @param serviceDate service day for which this update is valid
* @return whether the update was actually applied
*/
public Result<UpdateSuccess, UpdateError> update(
TripPattern pattern,
TripTimes updatedTripTimes,
LocalDate serviceDate
) {
// Preconditions
Objects.requireNonNull(pattern);
Objects.requireNonNull(serviceDate);
if (readOnly) {
throw new ConcurrentModificationException("This TimetableSnapshot is read-only.");
}
Timetable tt = resolve(pattern, serviceDate);
// we need to perform the copy of Timetable here rather than in Timetable.update()
// to avoid repeatedly copying in case several updates are applied to the same timetable
tt = copyTimetable(pattern, serviceDate, tt);
// Assume all trips in a pattern are from the same feed, which should be the case.
// Find trip index
int tripIndex = tt.getTripIndex(updatedTripTimes.getTrip().getId());
if (tripIndex == -1) {
// Trip not found, add it
tt.addTripTimes(updatedTripTimes);
} else {
// Set updated trip times of trip
tt.setTripTimes(tripIndex, updatedTripTimes);
}
if (pattern.isCreatedByRealtimeUpdater()) {
// Remember this pattern for the added trip id and service date
FeedScopedId tripId = updatedTripTimes.getTrip().getId();
TripIdAndServiceDate tripIdAndServiceDate = new TripIdAndServiceDate(tripId, serviceDate);
realtimeAddedTripPattern.put(tripIdAndServiceDate, pattern);
}
// To make these trip patterns visible for departureRow searches.
addPatternToIndex(pattern);
// The time tables are finished during the commit
return Result.success(UpdateSuccess.noWarnings());
}
/**
* This produces a small delay of typically around 50ms, which is almost entirely due to the
* indexing step. Cloning the map is much faster (2ms). It is perhaps better to index timetables
* as they are changed to avoid experiencing all this lag at once, but we want to avoid
* re-indexing when receiving multiple updates for the same timetable in rapid succession. This
* compromise is expressed by the maxSnapshotFrequency property of StoptimeUpdater. The indexing
* could be made much more efficient as well.
*
* @return an immutable copy of this TimetableSnapshot with all updates applied
*/
public TimetableSnapshot commit() {
return commit(null, false);
}
@SuppressWarnings("unchecked")
public TimetableSnapshot commit(TransitLayerUpdater transitLayerUpdater, boolean force) {
if (readOnly) {
throw new ConcurrentModificationException("This TimetableSnapshot is read-only.");
}
if (!force && !this.isDirty()) {
return null;
}
TimetableSnapshot ret = new TimetableSnapshot(
Map.copyOf(timetables),
Map.copyOf(realtimeAddedTripPattern),
ImmutableSetMultimap.copyOf(patternsForStop),
true
);
if (transitLayerUpdater != null) {
transitLayerUpdater.update(dirtyTimetables, timetables);
}
this.dirtyTimetables.clear();
this.dirty = false;
return ret;
}
/**
* Clear all data of snapshot for the provided feed id
*
* @param feedId feed id to clear the snapshot for
*/
public void clear(String feedId) {
if (readOnly) {
throw new ConcurrentModificationException("This TimetableSnapshot is read-only.");
}
// Clear all data from snapshot.
boolean timetableWasModified = clearTimetable(feedId);
boolean realtimeAddedWasModified = clearRealtimeAddedTripPattern(feedId);
// If this snapshot was modified, it will be dirty after the clear actions.
if (timetableWasModified || realtimeAddedWasModified) {
dirty = true;
}
}
/**
* If a previous realtime update has changed which trip pattern is associated with the given trip
* on the given service date, this method will dissociate the trip from that pattern and remove
* the trip's timetables from that pattern on that particular service date.
*
* For this service date, the trip will revert to its original trip pattern from the scheduled
* data, remaining on that pattern unless it's changed again by a future realtime update.
*
* @return true if the trip was found to be shifted to a different trip pattern by a realtime
* message and an attempt was made to re-associate it with its originally scheduled trip pattern.
*/
public boolean revertTripToScheduledTripPattern(FeedScopedId tripId, LocalDate serviceDate) {
if (readOnly) {
throw new ConcurrentModificationException("This TimetableSnapshot is read-only.");
}
boolean success = false;
final TripPattern pattern = getRealtimeAddedTripPattern(tripId, serviceDate);
if (pattern != null) {
// Dissociate the given trip from any realtime-added pattern.
// The trip will then fall back to its original scheduled pattern.
realtimeAddedTripPattern.remove(new TripIdAndServiceDate(tripId, serviceDate));
// Remove times for the trip from any timetables
// under that now-obsolete realtime-added pattern.
SortedSet<Timetable> sortedTimetables = this.timetables.get(pattern);
if (sortedTimetables != null) {
TripTimes tripTimesToRemove = null;
for (Timetable timetable : sortedTimetables) {
if (timetable.isValidFor(serviceDate)) {
final TripTimes tripTimes = timetable.getTripTimes(tripId);
if (tripTimes == null) {
LOG.debug("No triptimes to remove for trip {}", tripId);
} else if (tripTimesToRemove != null) {
LOG.debug("Found two triptimes to remove for trip {}", tripId);
} else {
tripTimesToRemove = tripTimes;
}
}
}
if (tripTimesToRemove != null) {
for (Timetable originalTimetable : sortedTimetables) {
if (originalTimetable.getTripTimes().contains(tripTimesToRemove)) {
Timetable updatedTimetable = copyTimetable(pattern, serviceDate, originalTimetable);
updatedTimetable.getTripTimes().remove(tripTimesToRemove);
}
}
}
}
success = true;
}
return success;
}
/**
* Removes all Timetables which are valid for a ServiceDate on-or-before the one supplied.
*
* @return true if any data has been modified and false if no purging has happened.
*/
public boolean purgeExpiredData(LocalDate serviceDate) {
if (readOnly) {
throw new ConcurrentModificationException("This TimetableSnapshot is read-only.");
}
boolean modified = false;
for (Iterator<TripPattern> it = timetables.keySet().iterator(); it.hasNext();) {
TripPattern pattern = it.next();
SortedSet<Timetable> sortedTimetables = timetables.get(pattern);
SortedSet<Timetable> toKeepTimetables = new TreeSet<>(new SortedTimetableComparator());
for (Timetable timetable : sortedTimetables) {
if (serviceDate.compareTo(timetable.getServiceDate()) < 0) {
toKeepTimetables.add(timetable);
} else {
modified = true;
}
}
if (toKeepTimetables.isEmpty()) {
it.remove();
} else {
timetables.put(pattern, ImmutableSortedSet.copyOfSorted(toKeepTimetables));
}
}
// Also remove last added trip pattern for days that are purged
for (
Iterator<Entry<TripIdAndServiceDate, TripPattern>> iterator = realtimeAddedTripPattern
.entrySet()
.iterator();
iterator.hasNext();
) {
TripIdAndServiceDate tripIdAndServiceDate = iterator.next().getKey();
if (serviceDate.compareTo(tripIdAndServiceDate.serviceDate()) >= 0) {
iterator.remove();
modified = true;
}
}
return modified;
}
public boolean isDirty() {
if (readOnly) {
return false;
}
return dirty;
}
public String toString() {
String d = readOnly ? "committed" : String.format("%d dirty", dirtyTimetables.size());
return String.format("Timetable snapshot: %d timetables (%s)", timetables.size(), d);
}
public Collection<TripPattern> getPatternsForStop(StopLocation stop) {
return patternsForStop.get(stop);
}
/**
* Does this snapshot contain any realtime data or is it completely empty?
*/
public boolean isEmpty() {
return dirtyTimetables.isEmpty() && timetables.isEmpty() && realtimeAddedTripPattern.isEmpty();
}
/**
* Clear timetable for all patterns matching the provided feed id.
*
* @param feedId feed id to clear out
* @return true if the timetable changed as a result of the call
*/
private boolean clearTimetable(String feedId) {
return timetables.keySet().removeIf(tripPattern -> feedId.equals(tripPattern.getFeedId()));
}
/**
* Clear all realtime added trip patterns matching the provided feed id.
*
* @param feedId feed id to clear out
* @return true if the realtimeAddedTripPattern changed as a result of the call
*/
private boolean clearRealtimeAddedTripPattern(String feedId) {
return realtimeAddedTripPattern
.keySet()
.removeIf(realtimeAddedTripPattern ->
feedId.equals(realtimeAddedTripPattern.tripId().getFeedId())
);
}
/**
* Add the patterns to the stop index, only if they come from a modified pattern
*/
private void addPatternToIndex(TripPattern tripPattern) {
if (tripPattern.isCreatedByRealtimeUpdater()) {
//TODO - SIRI: Add pattern to index?
for (var stop : tripPattern.getStops()) {
patternsForStop.put(stop, tripPattern);
}
}
}
/**
* Make a copy of the given timetable for a given pattern and service date.
* If the timetable was already copied-on write in this snapshot, the same instance will be
* returned. The SortedSet that holds the collection of Timetables for that pattern
* (sorted by service date) is shared between multiple snapshots and must be copied as well.<br/>
* Note on performance: if multiple Timetables are modified in a SortedSet, the SortedSet will be
* copied multiple times. The impact on memory/garbage collection is assumed to be minimal
* since the collection is small.
* The SortedSet is made immutable to prevent change after snapshot publication.
*/
private Timetable copyTimetable(TripPattern pattern, LocalDate serviceDate, Timetable tt) {
if (!dirtyTimetables.contains(tt)) {
Timetable old = tt;
tt = new Timetable(tt, serviceDate);
SortedSet<Timetable> sortedTimetables = timetables.get(pattern);
if (sortedTimetables == null) {
sortedTimetables = new TreeSet<>(new SortedTimetableComparator());
} else {
SortedSet<Timetable> temp = new TreeSet<>(new SortedTimetableComparator());
temp.addAll(sortedTimetables);
sortedTimetables = temp;
}
if (old.getServiceDate() != null) {
sortedTimetables.remove(old);
}
sortedTimetables.add(tt);
timetables.put(pattern, ImmutableSortedSet.copyOfSorted(sortedTimetables));
dirtyTimetables.add(tt);
dirty = true;
}
return tt;
}
protected static class SortedTimetableComparator implements Comparator<Timetable> {
@Override
public int compare(Timetable t1, Timetable t2) {
return t1.getServiceDate().compareTo(t2.getServiceDate());
}
}
}