Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ private void updateSiri() {
// All that said, out of all the update types, Alerts (and SIRI SX) are probably the ones
// that would be most tolerant of non-versioned application-wide storage since they don't
// participate in routing and are tacked on to already-completed routing responses.
saveResultOnGraph.execute(context -> {

updateGraph(context -> {
updateHandler.update(serviceDelivery, context);
if (markPrimed) {
primed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.opentripplanner.framework.application.OTPFeature;
import org.opentripplanner.updater.GraphWriterRunnable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -40,7 +42,16 @@ public abstract class PollingGraphUpdater implements GraphUpdater {
/**
* Parent update manager. Is used to execute graph writer runnables.
*/
protected WriteToGraphCallback saveResultOnGraph;
private WriteToGraphCallback saveResultOnGraph;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a new change, but it still seems a bit odd to me that the interface and field have different names here, with the field name being a verb. There's also a comment below referring to this as the "GraphWriter" and I'm not sure anything still has that name. Probably all a result of a series of renamings as the callback interface was extracted from the update manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the same field name "saveResultOnGraph" is used across all GraphUpdater implementations. I would keep this name for consistency for now. This can be refactored in a follow-up PR.


/**
* A Future representing pending completion of most recently submitted task.
* If the updater posts several tasks during one polling cycle, the handle will point to the
* latest posted task.
* Initially null when the polling updater starts.
*/
@Nullable
private volatile Future<?> previousTask;

/** Shared configuration code for all polling graph updaters. */
protected PollingGraphUpdater(PollingGraphUpdaterParameters config) {
Expand All @@ -55,6 +66,10 @@ public Duration pollingPeriod() {
@Override
public final void run() {
try {
if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
waitForPreviousTask();
}

// Run concrete polling graph updater's implementation method.
runPolling();
if (runOnlyOnce()) {
Expand Down Expand Up @@ -113,11 +128,33 @@ public final void setup(WriteToGraphCallback writeToGraphCallback) {
*/
protected abstract void runPolling() throws Exception;

protected final void updateGraph(GraphWriterRunnable task)
throws ExecutionException, InterruptedException {
var result = saveResultOnGraph.execute(task);
if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
result.get();
/**
* Post an update task to the GraphWriter queue.
* This is non-blocking.
* This can be called several times during one polling cycle.
* This is the sole way for polling updater implementations to submit real-time update tasks,
* while technical details about the execution of these tasks
* (frequency, concurrency, waiting, ...) are encapsulated in this parent class.
*/
protected final void updateGraph(GraphWriterRunnable task) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The system in this PR where this method is called from runPolling() in all subclasses, with this method being the sole way to submit tasks to a private WriteToGraphCallback seems like an even better approach than what I was speculating about in #6262 (comment).
I'd recommend we briefly explain that usage pattern in the Javadoc here though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

previousTask = saveResultOnGraph.execute(task);
Copy link
Member

@t2gran t2gran Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is non-blocking, does that mean it can be called from multiple threads or from one thread only? This should be documented in the JavaDoc.

What prevent this from waiting on it self?

I was currious what would happen if you try waiting on your self? The Future#get does not say, but this small program hangs - never returns:

  public static class A {
    Future previous = null;

    public static void main(String[] args) throws Exception {
      var pool = Executors.newFixedThreadPool(5);
      var a = new A();
      var f = pool.submit(a::run);
      a.previous = f;
    }

    void run() {
      try {
        Thread.sleep(500);
        Thread.yield();
        if (previous != null) {
          System.err.println("Reference to it self obtained!");
          previous.get();
          System.err.println("Do not get here!");
        }
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
  }

Output:

Reference to it self obtained!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@t2gran my understanding is that this method is intended to only be called in the runPolling() implementations of PollingGraphUpdater subclasses. Those are in turn only expected to be called by PollingGraphUpdater.run() which is in turn only run by GraphUpdaterManager.pollingUpdaterPool.scheduleWithFixedDelay() which, if I'm not mistaken, means it would only ever be called on one thread at a time.

So I think the self-waiting situation you describe should only arise if someone made a PollingGraphUpdater subclass, on which the runPolling method was implemented, written to contain something like updateGraph(self) instead of updateGraph(closure). Such intentional indirect recursion would imply the author was confused about how these updaters are meant to work. Would Javadoc clearly stating how this method is used in PollingGraphUpdaters be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevent this from waiting on it self?

There are 2 thread pools involved, one for the polling thread, one for the graph writer.
From the polling thread we wait for a task running in the graph writer thread, so I do not see a situation where "you wait on yourself"? Unless there is an implementation bug in the updater as @abyrd mentioned.

If this is non-blocking, does that mean it can be called from multiple threads or from one thread only? This should be documented in the JavaDoc.

Do you refer to an hypothetical PollingUpdater implementation that would itself be multi-threaded and that would call updateGraph() multiple times from several threads from its own thread pool?
No such updater exists today, but that would break the logic implemented in this PR, since this PR assumes that if a PollingUpdater calls updateGraph() multiple times, it does so sequentially.

}

/**
* If the previous task takes longer than the polling interval,
* we delay the next polling cycle until the task is complete.
* This prevents tasks from piling up.
* If the updater sends several tasks during a polling cycle, we wait on the latest posted task.
* */
private void waitForPreviousTask() throws InterruptedException, ExecutionException {
if (previousTask != null && !previousTask.isDone()) {
LOG.info("Delaying polling until the previous task is complete");
long startBlockingWait = System.currentTimeMillis();
previousTask.get();
LOG.info(
"Resuming polling after waiting an additional {}s",
(System.currentTimeMillis() - startBlockingWait) / 1000
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void runPolling() {
List<EstimatedTimetableDeliveryStructure> etds =
serviceDelivery.getEstimatedTimetableDeliveries();
if (etds != null) {
saveResultOnGraph.execute(context -> {
updateGraph(context -> {
var result = estimatedTimetableHandler.applyUpdate(etds, incrementality, context);
ResultLogger.logUpdateResult(feedId, "siri-et", result);
metricsConsumer.accept(result);
Expand Down

This file was deleted.

Loading