Skip to content

Commit de56d00

Browse files
authored
Merge pull request #5974 from entur/publish_timetable_snapshot_in_background
Publish timetable snapshot in background
2 parents 09ced42 + cd8e29e commit de56d00

File tree

11 files changed

+286
-250
lines changed

11 files changed

+286
-250
lines changed

src/ext/java/org/opentripplanner/ext/siri/SiriTimetableSnapshotSource.java

+36-27
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ public class SiriTimetableSnapshotSource implements TimetableSnapshotProvider {
5757
*/
5858
private final SiriTripPatternCache tripPatternCache;
5959

60-
private final TransitEditorService transitService;
60+
/**
61+
* Long-lived transit editor service that has access to the timetable snapshot buffer.
62+
* This differs from the usual use case where the transit service refers to the latest published
63+
* timetable snapshot.
64+
*/
65+
private final TransitEditorService transitEditorService;
6166

6267
private final TimetableSnapshotManager snapshotManager;
6368

@@ -71,9 +76,10 @@ public SiriTimetableSnapshotSource(
7176
parameters,
7277
() -> LocalDate.now(transitModel.getTimeZone())
7378
);
74-
this.transitService = new DefaultTransitService(transitModel);
79+
this.transitEditorService =
80+
new DefaultTransitService(transitModel, getTimetableSnapshotBuffer());
7581
this.tripPatternCache =
76-
new SiriTripPatternCache(tripPatternIdGenerator, transitService::getPatternForTrip);
82+
new SiriTripPatternCache(tripPatternIdGenerator, transitEditorService::getPatternForTrip);
7783

7884
transitModel.initTimetableSnapshotProvider(this);
7985
}
@@ -102,26 +108,22 @@ public UpdateResult applyEstimatedTimetable(
102108

103109
List<Result<UpdateSuccess, UpdateError>> results = new ArrayList<>();
104110

105-
snapshotManager.withLock(() -> {
106-
if (incrementality == FULL_DATASET) {
107-
// Remove all updates from the buffer
108-
snapshotManager.clearBuffer(feedId);
109-
}
111+
if (incrementality == FULL_DATASET) {
112+
// Remove all updates from the buffer
113+
snapshotManager.clearBuffer(feedId);
114+
}
110115

111-
for (var etDelivery : updates) {
112-
for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) {
113-
var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies();
114-
LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size());
115-
for (EstimatedVehicleJourney journey : journeys) {
116-
results.add(apply(journey, transitService, fuzzyTripMatcher, entityResolver));
117-
}
116+
for (var etDelivery : updates) {
117+
for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) {
118+
var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies();
119+
LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size());
120+
for (EstimatedVehicleJourney journey : journeys) {
121+
results.add(apply(journey, transitEditorService, fuzzyTripMatcher, entityResolver));
118122
}
119123
}
124+
}
120125

121-
LOG.debug("message contains {} trip updates", updates.size());
122-
123-
snapshotManager.purgeAndCommit();
124-
});
126+
LOG.debug("message contains {} trip updates", updates.size());
125127

126128
return UpdateResult.ofResults(results);
127129
}
@@ -131,6 +133,10 @@ public TimetableSnapshot getTimetableSnapshot() {
131133
return snapshotManager.getTimetableSnapshot();
132134
}
133135

136+
private TimetableSnapshot getTimetableSnapshotBuffer() {
137+
return snapshotManager.getTimetableSnapshotBuffer();
138+
}
139+
134140
private Result<UpdateSuccess, UpdateError> apply(
135141
EstimatedVehicleJourney journey,
136142
TransitEditorService transitService,
@@ -195,11 +201,7 @@ private boolean shouldAddNewTrip(
195201
* Snapshot timetable is used as source if initialised, trip patterns scheduled timetable if not.
196202
*/
197203
private Timetable getCurrentTimetable(TripPattern tripPattern, LocalDate serviceDate) {
198-
TimetableSnapshot timetableSnapshot = getTimetableSnapshot();
199-
if (timetableSnapshot != null) {
200-
return timetableSnapshot.resolve(tripPattern, serviceDate);
201-
}
202-
return tripPattern.getScheduledTimetable();
204+
return getTimetableSnapshotBuffer().resolve(tripPattern, serviceDate);
203205
}
204206

205207
private Result<TripUpdate, UpdateError> handleModifiedTrip(
@@ -228,7 +230,7 @@ private Result<TripUpdate, UpdateError> handleModifiedTrip(
228230

229231
if (trip != null) {
230232
// Found exact match
231-
pattern = transitService.getPatternForTrip(trip);
233+
pattern = transitEditorService.getPatternForTrip(trip);
232234
} else if (fuzzyTripMatcher != null) {
233235
// No exact match found - search for trips based on arrival-times/stop-patterns
234236
TripAndPattern tripAndPattern = fuzzyTripMatcher.match(
@@ -263,7 +265,7 @@ private Result<TripUpdate, UpdateError> handleModifiedTrip(
263265
pattern,
264266
estimatedVehicleJourney,
265267
serviceDate,
266-
transitService.getTimeZone(),
268+
transitEditorService.getTimeZone(),
267269
entityResolver
268270
)
269271
.build();
@@ -310,7 +312,7 @@ private Result<UpdateSuccess, UpdateError> addTripToGraphAndBuffer(TripUpdate tr
310312
private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDate) {
311313
boolean success = false;
312314

313-
final TripPattern pattern = transitService.getPatternForTrip(trip);
315+
final TripPattern pattern = transitEditorService.getPatternForTrip(trip);
314316

315317
if (pattern != null) {
316318
// Mark scheduled trip times for this trip in this pattern as deleted
@@ -329,4 +331,11 @@ private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDat
329331

330332
return success;
331333
}
334+
335+
/**
336+
* Flush pending changes in the timetable snapshot buffer and publish a new snapshot.
337+
*/
338+
public void flushBuffer() {
339+
snapshotManager.purgeAndCommit();
340+
}
332341
}

src/main/java/org/opentripplanner/model/TimetableSnapshot.java

+5
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@
6565
* up timetables on this class could conceivably be replaced with snapshotting entire views of the
6666
* transit network. It would also be possible to make the realtime version of Timetables or
6767
* TripTimes the primary view, and include references back to their scheduled versions.
68+
* <p>
69+
* Implementation note: when a snapshot is committed, the mutable state of this class is stored
70+
* in final fields and completely initialized in the constructor. This provides an additional
71+
* guarantee of safe-publication without synchronization.
72+
* (see <a href="https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.5">final Field Semantics</a>)
6873
*/
6974
public class TimetableSnapshot {
7075

src/main/java/org/opentripplanner/transit/service/DefaultTransitService.java

+8
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ public DefaultTransitService(TransitModel transitModel) {
7979
this.transitModelIndex = transitModel.getTransitModelIndex();
8080
}
8181

82+
public DefaultTransitService(
83+
TransitModel transitModel,
84+
TimetableSnapshot timetableSnapshotBuffer
85+
) {
86+
this(transitModel);
87+
this.timetableSnapshot = timetableSnapshotBuffer;
88+
}
89+
8290
@Override
8391
public Collection<String> getFeedIds() {
8492
return this.transitModel.getFeedIds();

src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java

+22
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
56
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
67
import org.opentripplanner.ext.siri.updater.SiriETUpdater;
78
import org.opentripplanner.ext.siri.updater.SiriSXUpdater;
@@ -20,6 +21,7 @@
2021
import org.opentripplanner.updater.UpdatersParameters;
2122
import org.opentripplanner.updater.alert.GtfsRealtimeAlertsUpdater;
2223
import org.opentripplanner.updater.spi.GraphUpdater;
24+
import org.opentripplanner.updater.spi.TimetableSnapshotFlush;
2325
import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdater;
2426
import org.opentripplanner.updater.trip.PollingTripUpdater;
2527
import org.opentripplanner.updater.trip.TimetableSnapshotSource;
@@ -94,6 +96,9 @@ private void configure() {
9496
);
9597

9698
GraphUpdaterManager updaterManager = new GraphUpdaterManager(graph, transitModel, updaters);
99+
100+
configureTimetableSnapshotFlush(updaterManager);
101+
97102
updaterManager.startUpdaters();
98103

99104
// Stop the updater manager if it contains nothing
@@ -223,4 +228,21 @@ private TimetableSnapshotSource provideGtfsTimetableSnapshot() {
223228
}
224229
return gtfsTimetableSnapshotSource;
225230
}
231+
232+
/**
233+
* If SIRI or GTFS real-time updaters are in use, configure a periodic flush of the timetable
234+
* snapshot.
235+
*/
236+
private void configureTimetableSnapshotFlush(GraphUpdaterManager updaterManager) {
237+
if (siriTimetableSnapshotSource != null || gtfsTimetableSnapshotSource != null) {
238+
updaterManager
239+
.getScheduler()
240+
.scheduleWithFixedDelay(
241+
new TimetableSnapshotFlush(siriTimetableSnapshotSource, gtfsTimetableSnapshotSource),
242+
0,
243+
updatersParameters.timetableSnapshotParameters().maxSnapshotFrequency().toSeconds(),
244+
TimeUnit.SECONDS
245+
);
246+
}
247+
}
226248
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.opentripplanner.updater.spi;
2+
3+
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
4+
import org.opentripplanner.updater.trip.TimetableSnapshotSource;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
/**
9+
* Flush the timetable snapshot buffer by committing pending changes.
10+
* Exceptions occurring during the flush are caught and ignored: the scheduler can then retry
11+
* the task later.
12+
*/
13+
public class TimetableSnapshotFlush implements Runnable {
14+
15+
private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotFlush.class);
16+
17+
private final SiriTimetableSnapshotSource siriTimetableSnapshotSource;
18+
private final TimetableSnapshotSource gtfsTimetableSnapshotSource;
19+
20+
public TimetableSnapshotFlush(
21+
SiriTimetableSnapshotSource siriTimetableSnapshotSource,
22+
TimetableSnapshotSource gtfsTimetableSnapshotSource
23+
) {
24+
this.siriTimetableSnapshotSource = siriTimetableSnapshotSource;
25+
this.gtfsTimetableSnapshotSource = gtfsTimetableSnapshotSource;
26+
}
27+
28+
@Override
29+
public void run() {
30+
try {
31+
LOG.debug("Flushing timetable snapshot buffer");
32+
if (siriTimetableSnapshotSource != null) {
33+
siriTimetableSnapshotSource.flushBuffer();
34+
}
35+
if (gtfsTimetableSnapshotSource != null) {
36+
gtfsTimetableSnapshotSource.flushBuffer();
37+
}
38+
LOG.debug("Flushed timetable snapshot buffer");
39+
} catch (Throwable t) {
40+
LOG.error("Error flushing timetable snapshot buffer", t);
41+
}
42+
}
43+
}

src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotManager.java

+21-68
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22

33
import java.time.LocalDate;
44
import java.util.Objects;
5-
import java.util.concurrent.locks.ReentrantLock;
65
import java.util.function.Supplier;
76
import javax.annotation.Nullable;
8-
import org.opentripplanner.framework.time.CountdownTimer;
97
import org.opentripplanner.model.Timetable;
108
import org.opentripplanner.model.TimetableSnapshot;
119
import org.opentripplanner.routing.algorithm.raptoradapter.transit.mappers.TransitLayerUpdater;
10+
import org.opentripplanner.routing.util.ConcurrentPublished;
1211
import org.opentripplanner.transit.model.framework.FeedScopedId;
1312
import org.opentripplanner.transit.model.framework.Result;
1413
import org.opentripplanner.transit.model.network.TripPattern;
@@ -30,36 +29,18 @@ public final class TimetableSnapshotManager {
3029

3130
private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotManager.class);
3231
private final TransitLayerUpdater transitLayerUpdater;
33-
/**
34-
* Lock to indicate that buffer is in use
35-
*/
36-
private final ReentrantLock bufferLock = new ReentrantLock(true);
3732

3833
/**
39-
* The working copy of the timetable snapshot. Should not be visible to routing threads. Should
40-
* only be modified by a thread that holds a lock on {@link #bufferLock}. All public methods that
41-
* might modify this buffer will correctly acquire the lock. By design, only one thread should
42-
* ever be writing to this buffer.
43-
* TODO RT_AB: research and document why this lock is needed since only one thread should ever be
44-
* writing to this buffer. One possible reason may be a need to suspend writes while indexing
45-
* and swapping out the buffer. But the original idea was to make a new copy of the buffer
46-
* before re-indexing it. While refactoring or rewriting parts of this system, we could throw
47-
* an exception if a writing section is entered by more than one thread.
34+
* The working copy of the timetable snapshot. Should not be visible to routing threads.
35+
* By design, only one thread should ever be writing to this buffer.
4836
*/
4937
private final TimetableSnapshot buffer = new TimetableSnapshot();
5038

5139
/**
5240
* The last committed snapshot that was handed off to a routing thread. This snapshot may be given
53-
* to more than one routing thread if the maximum snapshot frequency is exceeded.
54-
*/
55-
private volatile TimetableSnapshot snapshot = null;
56-
57-
/**
58-
* If a timetable snapshot is requested less than this number of milliseconds after the previous
59-
* snapshot, just return the same one. Throttles the potentially resource-consuming task of
60-
* duplicating a TripPattern -> Timetable map and indexing the new Timetables.
41+
* to more than one routing thread.
6142
*/
62-
private final CountdownTimer snapshotFrequencyThrottle;
43+
private final ConcurrentPublished<TimetableSnapshot> snapshot = new ConcurrentPublished<>();
6344

6445
/**
6546
* Should expired real-time data be purged from the graph.
@@ -85,7 +66,6 @@ public TimetableSnapshotManager(
8566
Supplier<LocalDate> localDateNow
8667
) {
8768
this.transitLayerUpdater = transitLayerUpdater;
88-
this.snapshotFrequencyThrottle = new CountdownTimer(parameters.maxSnapshotFrequency());
8969
this.purgeExpiredData = parameters.purgeExpiredData();
9070
this.localDateNow = Objects.requireNonNull(localDateNow);
9171
// Force commit so that snapshot initializes
@@ -99,19 +79,17 @@ public TimetableSnapshotManager(
9979
* to the snapshot to release resources.
10080
*/
10181
public TimetableSnapshot getTimetableSnapshot() {
102-
// Try to get a lock on the buffer
103-
if (bufferLock.tryLock()) {
104-
// Make a new snapshot if necessary
105-
try {
106-
commitTimetableSnapshot(false);
107-
return snapshot;
108-
} finally {
109-
bufferLock.unlock();
110-
}
111-
}
112-
// No lock could be obtained because there is either a snapshot commit busy or updates
113-
// are applied at this moment, just return the current snapshot
114-
return snapshot;
82+
return snapshot.get();
83+
}
84+
85+
/**
86+
* @return the current timetable snapshot buffer that contains pending changes (not yet published
87+
* in a snapshot).
88+
* This should be used in the context of an updater to build a TransitEditorService that sees all
89+
* the changes applied so far by real-time updates.
90+
*/
91+
public TimetableSnapshot getTimetableSnapshotBuffer() {
92+
return buffer;
11593
}
11694

11795
/**
@@ -122,21 +100,12 @@ public TimetableSnapshot getTimetableSnapshot() {
122100
*
123101
* @param force Force the committing of a new snapshot even if the above conditions are not met.
124102
*/
125-
public void commitTimetableSnapshot(final boolean force) {
126-
if (force || snapshotFrequencyThrottle.timeIsUp()) {
127-
if (force || buffer.isDirty()) {
128-
LOG.debug("Committing {}", buffer);
129-
snapshot = buffer.commit(transitLayerUpdater, force);
130-
131-
// We only reset the timer when the snapshot is updated. This will cause the first
132-
// update to be committed after a silent period. This should not have any effect in
133-
// a busy updater. It is however useful when manually testing the updater.
134-
snapshotFrequencyThrottle.restart();
135-
} else {
136-
LOG.debug("Buffer was unchanged, keeping old snapshot.");
137-
}
103+
void commitTimetableSnapshot(final boolean force) {
104+
if (force || buffer.isDirty()) {
105+
LOG.debug("Committing {}", buffer);
106+
snapshot.publish(buffer.commit(transitLayerUpdater, force));
138107
} else {
139-
LOG.debug("Snapshot frequency exceeded. Reusing snapshot {}", snapshot);
108+
LOG.debug("Buffer was unchanged, keeping old snapshot.");
140109
}
141110
}
142111

@@ -205,22 +174,6 @@ private boolean purgeExpiredData() {
205174
return buffer.purgeExpiredData(previously);
206175
}
207176

208-
/**
209-
* Execute a {@code Runnable} with a locked snapshot buffer and release the lock afterwards. While
210-
* the action of locking and unlocking is not complicated to do for calling code, this method
211-
* exists so that the lock instance is a private field.
212-
*/
213-
public void withLock(Runnable action) {
214-
bufferLock.lock();
215-
216-
try {
217-
action.run();
218-
} finally {
219-
// Always release lock
220-
bufferLock.unlock();
221-
}
222-
}
223-
224177
/**
225178
* Clear all data of snapshot for the provided feed id
226179
*/

0 commit comments

Comments
 (0)