|
26 | 26 | import java.util.ArrayList; |
27 | 27 | import java.util.List; |
28 | 28 | import java.util.Objects; |
| 29 | +import java.util.Optional; |
29 | 30 | import java.util.concurrent.CountDownLatch; |
30 | 31 | import java.util.concurrent.TimeUnit; |
31 | 32 | import okhttp3.HttpUrl; |
@@ -54,7 +55,7 @@ public class EventSourceBeaconChainEventAdapter |
54 | 55 | private final CountDownLatch runningLatch = new CountDownLatch(1); |
55 | 56 |
|
56 | 57 | private volatile BackgroundEventSource eventSource; |
57 | | - private volatile RemoteValidatorApiChannel currentBeaconNodeUsedForEventStreaming; |
| 58 | + @VisibleForTesting volatile RemoteValidatorApiChannel currentBeaconNodeUsedForEventStreaming; |
58 | 59 |
|
59 | 60 | private final BeaconNodeReadinessManager beaconNodeReadinessManager; |
60 | 61 | private final RemoteValidatorApiChannel primaryBeaconNodeApi; |
@@ -120,9 +121,14 @@ public void onPrimaryNodeNotReady() { |
120 | 121 | } |
121 | 122 |
|
122 | 123 | @Override |
| 124 | + @SuppressWarnings("FutureReturnValueIgnored") |
123 | 125 | public void onFailoverNodeNotReady(final RemoteValidatorApiChannel failoverNotInSync) { |
124 | 126 | if (currentEventStreamHasSameEndpoint(failoverNotInSync)) { |
125 | | - switchToFailoverEventStreamIfAvailable(); |
| 127 | + if (failoverBeaconNodeApis.size() == 1 || !switchToFailoverEventStreamIfAvailable()) { |
| 128 | + // No failover switching is available, and we are currently connected to a failover node |
| 129 | + // with issues, so trigger the readiness check against the primary BN immediately |
| 130 | + beaconNodeReadinessManager.performPrimaryReadinessCheck(); |
| 131 | + } |
126 | 132 | } |
127 | 133 | } |
128 | 134 |
|
@@ -170,26 +176,28 @@ private HttpUrl createEventStreamSourceUrl( |
170 | 176 | } |
171 | 177 |
|
172 | 178 | // synchronized because of the ConnectionErrorHandler and the BeaconNodeReadinessChannel callbacks |
173 | | - private synchronized void switchToFailoverEventStreamIfAvailable() { |
| 179 | + private synchronized boolean switchToFailoverEventStreamIfAvailable() { |
174 | 180 | if (failoverBeaconNodeApis.isEmpty()) { |
175 | | - return; |
| 181 | + return false; |
176 | 182 | } |
177 | | - findReadyFailoverAndSwitch(); |
| 183 | + return findReadyFailoverAndSwitch(); |
178 | 184 | } |
179 | 185 |
|
180 | | - private void findReadyFailoverAndSwitch() { |
181 | | - failoverBeaconNodeApis.stream() |
182 | | - .filter(beaconNodeReadinessManager::isReady) |
183 | | - .findFirst() |
184 | | - .ifPresentOrElse( |
185 | | - this::switchToFailoverEventStream, |
186 | | - validatorLogger::noFailoverBeaconNodesAvailableForEventStreaming); |
| 186 | + private boolean findReadyFailoverAndSwitch() { |
| 187 | + final Optional<? extends RemoteValidatorApiChannel> readyFailover = |
| 188 | + failoverBeaconNodeApis.stream() |
| 189 | + .filter(beaconNodeReadinessManager::isReady) |
| 190 | + .filter(failover -> !currentEventStreamHasSameEndpoint(failover)) |
| 191 | + .findFirst(); |
| 192 | + if (readyFailover.isPresent()) { |
| 193 | + switchToFailoverEventStream(readyFailover.get()); |
| 194 | + return true; |
| 195 | + } |
| 196 | + validatorLogger.noFailoverBeaconNodesAvailableForEventStreaming(); |
| 197 | + return false; |
187 | 198 | } |
188 | 199 |
|
189 | 200 | private void switchToFailoverEventStream(final RemoteValidatorApiChannel beaconNodeApi) { |
190 | | - if (currentEventStreamHasSameEndpoint(beaconNodeApi)) { |
191 | | - return; |
192 | | - } |
193 | 201 | eventSource.close(); |
194 | 202 | eventSource = createEventSource(beaconNodeApi); |
195 | 203 | currentBeaconNodeUsedForEventStreaming = beaconNodeApi; |
|
0 commit comments