@@ -41,8 +41,8 @@ public class TimeBasedEth1HeadTracker implements Eth1HeadTracker, RunLoopLogic {
41
41
42
42
private final Spec spec ;
43
43
private final TimeProvider timeProvider ;
44
- private final AsyncRunner asyncRunner ;
45
44
private final Eth1Provider eth1Provider ;
45
+ private final AsyncRunLoop asyncRunLoop ;
46
46
47
47
private final ObservableValue <UInt64 > headSubscription = new ObservableValue <>(true );
48
48
@@ -58,13 +58,29 @@ public TimeBasedEth1HeadTracker(
58
58
final Eth1Provider eth1Provider ) {
59
59
this .spec = spec ;
60
60
this .timeProvider = timeProvider ;
61
- this .asyncRunner = asyncRunner ;
62
61
this .eth1Provider = eth1Provider ;
62
+ this .asyncRunLoop =
63
+ new AsyncRunLoop (this , asyncRunner , Constants .ETH1_DEPOSIT_REQUEST_RETRY_TIMEOUT );
63
64
}
64
65
65
66
@ Override
66
67
public void start () {
67
- new AsyncRunLoop (this , asyncRunner , Constants .ETH1_DEPOSIT_REQUEST_RETRY_TIMEOUT ).start ();
68
+ asyncRunLoop .start ();
69
+ }
70
+
71
+ @ Override
72
+ public void stop () {
73
+ asyncRunLoop .stop ();
74
+ }
75
+
76
+ @ Override
77
+ public long subscribe (final ValueObserver <UInt64 > subscriber ) {
78
+ return headSubscription .subscribe (subscriber );
79
+ }
80
+
81
+ @ Override
82
+ public void unsubscribe (final long subscriberId ) {
83
+ headSubscription .unsubscribe (subscriberId );
68
84
}
69
85
70
86
@ Override
@@ -108,6 +124,28 @@ public SafeFuture<Void> advance() {
108
124
}
109
125
}
110
126
127
+ @ Override
128
+ public Duration getDelayUntilNextAdvance () {
129
+ return Duration .ofSeconds (
130
+ nextAdvanceTimeInSeconds .minusMinZero (timeProvider .getTimeInSeconds ()).longValue ());
131
+ }
132
+
133
+ @ Override
134
+ public void onError (final Throwable t ) {
135
+ final Throwable rootCause = Throwables .getRootCause (t );
136
+ if (rootCause instanceof BlockUnavailableException ) {
137
+ LOG .error (
138
+ "Block number {} not yet available. Retrying after delay." ,
139
+ ((BlockUnavailableException ) rootCause ).getBlockNumber ());
140
+ } else if (rootCause instanceof Eth1RequestException && rootCause .getSuppressed ().length == 0 ) {
141
+ LOG .debug ("Failed to update eth1 chain head - no endpoints available" );
142
+ } else if (ExceptionUtil .hasCause (t , ConnectException .class )) {
143
+ LOG .error ("Failed to update eth1 chain head - {}" , t .getMessage ());
144
+ } else {
145
+ LOG .error ("Failed to update eth1 chain head" , t );
146
+ }
147
+ }
148
+
111
149
private SafeFuture <Void > stepForward () {
112
150
LOG .trace ("Searching forwards from block {}" , nextCandidateHead .getNumber ());
113
151
if (isOldEnough (nextCandidateHead )) {
@@ -162,28 +200,6 @@ private SafeFuture<Block> getBlock(final UInt64 blockNumber) {
162
200
maybeBlock -> maybeBlock .orElseThrow (() -> new BlockUnavailableException (blockNumber )));
163
201
}
164
202
165
- @ Override
166
- public Duration getDelayUntilNextAdvance () {
167
- return Duration .ofSeconds (
168
- nextAdvanceTimeInSeconds .minusMinZero (timeProvider .getTimeInSeconds ()).longValue ());
169
- }
170
-
171
- @ Override
172
- public void onError (final Throwable t ) {
173
- final Throwable rootCause = Throwables .getRootCause (t );
174
- if (rootCause instanceof BlockUnavailableException ) {
175
- LOG .error (
176
- "Block number {} not yet available. Retrying after delay." ,
177
- ((BlockUnavailableException ) rootCause ).getBlockNumber ());
178
- } else if (rootCause instanceof Eth1RequestException && rootCause .getSuppressed ().length == 0 ) {
179
- LOG .debug ("Failed to update eth1 chain head - no endpoints available" );
180
- } else if (ExceptionUtil .hasCause (t , ConnectException .class )) {
181
- LOG .error ("Failed to update eth1 chain head - {}" , t .getMessage ());
182
- } else {
183
- LOG .error ("Failed to update eth1 chain head" , t );
184
- }
185
- }
186
-
187
203
private boolean isOldEnough (final Block headBlock ) {
188
204
final UInt64 cutOffTime = getCutOffTime ();
189
205
final long blockTime = headBlock .getTimestamp ().longValueExact ();
@@ -206,9 +222,6 @@ private UInt64 getEth1FollowDistance() {
206
222
return spec .getGenesisSpecConfig ().getEth1FollowDistance ();
207
223
}
208
224
209
- @ Override
210
- public void stop () {}
211
-
212
225
private void notifyNewHead (final Block headBlock ) {
213
226
searchForwards = true ;
214
227
if (!headBlock .equals (lastNotifiedChainHead )) {
@@ -221,16 +234,6 @@ private void notifyNewHead(final Block headBlock) {
221
234
}
222
235
}
223
236
224
- @ Override
225
- public long subscribe (final ValueObserver <UInt64 > subscriber ) {
226
- return headSubscription .subscribe (subscriber );
227
- }
228
-
229
- @ Override
230
- public void unsubscribe (final long subscriberId ) {
231
- headSubscription .unsubscribe (subscriberId );
232
- }
233
-
234
237
private static class BlockUnavailableException extends RuntimeException {
235
238
private final UInt64 blockNumber ;
236
239
0 commit comments