Skip to content

Commit 9d235a4

Browse files
committed
fix off by one and add unit tests
Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
1 parent d0eb29f commit 9d235a4

7 files changed

Lines changed: 1870 additions & 1 deletion

File tree

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/BlockHeaderSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public synchronized List<BlockHeader> next() {
7676
}
7777

7878
long start = currentBlock.getAndAdd(batchSize);
79-
final int actualLength = (int) Math.min(batchSize, pivotBlockNumber - start);
79+
final int actualLength = (int) Math.min(batchSize, pivotBlockNumber - start + 1);
8080

8181
LOG.trace(
8282
"BlockHeaderSource reading batch: {} blocks from block number {}", actualLength, start);

ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTaskTest.java

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,16 @@
1616

1717
import static org.assertj.core.api.Assertions.assertThat;
1818
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.when;
1921

22+
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
2023
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
2124
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
25+
import org.hyperledger.besu.ethereum.eth.manager.EthPeerImmutableAttributes;
26+
import org.hyperledger.besu.ethereum.eth.manager.PeerReputation;
2227
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
28+
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
2329
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
2430
import org.hyperledger.besu.plugin.services.MetricsSystem;
2531

@@ -74,6 +80,129 @@ public void shouldFailAfterMaxRetriesExecutions() throws InterruptedException {
7480
failBecauseExceptionWasNotThrown(MaxRetriesReachedException.class);
7581
}
7682

83+
@Test
84+
public void shouldAssignSuitablePeer() {
85+
final int maxRetries = 2;
86+
TaskThatAcceptsAllPeers task = new TaskThatAcceptsAllPeers(0, maxRetries);
87+
EthPeer mockPeer = createMockPeerWithHeight(100L);
88+
89+
boolean assigned = task.assignPeer(mockPeer);
90+
91+
assertThat(assigned).isTrue();
92+
assertThat(task.getAssignedPeer()).isPresent();
93+
assertThat(task.getAssignedPeer().get()).isEqualTo(mockPeer);
94+
}
95+
96+
@Test
97+
public void shouldRejectUnsuitablePeer() {
98+
final int maxRetries = 2;
99+
TaskWithPeerFilter task = new TaskWithPeerFilter(0, maxRetries, 100);
100+
101+
EthPeer unsuitablePeer = createMockPeerWithHeight(50L);
102+
103+
boolean assigned = task.assignPeer(unsuitablePeer);
104+
105+
assertThat(assigned).isFalse();
106+
assertThat(task.getAssignedPeer()).isEmpty();
107+
}
108+
109+
@Test
110+
public void shouldAcceptSuitablePeerBasedOnCustomFilter() {
111+
final int maxRetries = 2;
112+
TaskWithPeerFilter task = new TaskWithPeerFilter(0, maxRetries, 100);
113+
114+
EthPeer suitablePeer = createMockPeerWithHeight(150L);
115+
116+
boolean assigned = task.assignPeer(suitablePeer);
117+
118+
assertThat(assigned).isTrue();
119+
assertThat(task.getAssignedPeer()).isPresent();
120+
assertThat(task.getAssignedPeer().get()).isEqualTo(suitablePeer);
121+
}
122+
123+
private EthPeer createMockPeerWithHeight(final long estimatedHeight) {
124+
EthPeer peer = mock(EthPeer.class);
125+
ChainState chainState = mock(ChainState.class);
126+
PeerReputation reputation = mock(PeerReputation.class);
127+
PeerConnection connection = mock(PeerConnection.class);
128+
129+
when(peer.chainState()).thenReturn(chainState);
130+
when(chainState.hasEstimatedHeight()).thenReturn(true);
131+
when(chainState.getEstimatedHeight()).thenReturn(estimatedHeight);
132+
when(chainState.getEstimatedTotalDifficulty())
133+
.thenReturn(org.hyperledger.besu.ethereum.core.Difficulty.ZERO);
134+
when(peer.getReputation()).thenReturn(reputation);
135+
when(reputation.getScore()).thenReturn(0);
136+
when(peer.outstandingRequests()).thenReturn(0);
137+
when(peer.getLastRequestTimestamp()).thenReturn(0L);
138+
when(peer.isDisconnected()).thenReturn(false);
139+
when(peer.isFullyValidated()).thenReturn(true);
140+
when(peer.isServingSnap()).thenReturn(false);
141+
when(peer.hasAvailableRequestCapacity()).thenReturn(true);
142+
when(peer.getConnection()).thenReturn(connection);
143+
when(connection.inboundInitiated()).thenReturn(false);
144+
return peer;
145+
}
146+
147+
@Test
148+
public void shouldFailExactlyAtMaxRetries() throws InterruptedException {
149+
final int maxRetries = 3;
150+
// Task that fails exactly maxRetries times (boundary condition)
151+
TaskThatFailsSometimes task = new TaskThatFailsSometimes(maxRetries, maxRetries);
152+
CompletableFuture<Boolean> result = task.run();
153+
154+
assertThat(result.isCompletedExceptionally()).isTrue();
155+
// Should execute exactly maxRetries times (not maxRetries + 1)
156+
assertThat(task.executions).isEqualTo(maxRetries);
157+
try {
158+
result.get();
159+
} catch (ExecutionException ee) {
160+
assertThat(ee).hasCauseExactlyInstanceOf(MaxRetriesReachedException.class);
161+
return;
162+
}
163+
failBecauseExceptionWasNotThrown(MaxRetriesReachedException.class);
164+
}
165+
166+
@Test
167+
public void shouldSucceedOnLastRetry() throws InterruptedException, ExecutionException {
168+
final int maxRetries = 3;
169+
// Task that fails maxRetries - 1 times, succeeds on last attempt
170+
TaskThatFailsSometimes task = new TaskThatFailsSometimes(maxRetries - 1, maxRetries);
171+
CompletableFuture<Boolean> result = task.run();
172+
173+
assertThat(result.get()).isTrue();
174+
assertThat(task.executions).isEqualTo(maxRetries);
175+
}
176+
177+
@Test
178+
public void shouldResetRetryCounterOnPartialSuccess()
179+
throws InterruptedException, ExecutionException {
180+
final int maxRetries = 2;
181+
TaskWithPartialSuccess task = new TaskWithPartialSuccess(maxRetries);
182+
CompletableFuture<String> result = task.run();
183+
184+
assertThat(result.get()).isEqualTo("success");
185+
// Should execute multiple times: fail, partial, fail, partial, success
186+
assertThat(task.executions).isGreaterThan(2);
187+
}
188+
189+
@Test
190+
public void shouldGetAssignedPeerWhenNotAssigned() {
191+
final int maxRetries = 2;
192+
TaskThatFailsSometimes task = new TaskThatFailsSometimes(0, maxRetries);
193+
194+
assertThat(task.getAssignedPeer()).isEmpty();
195+
}
196+
197+
@Test
198+
public void shouldProvideAccessToProtectedGetters() {
199+
final int maxRetries = 2;
200+
TaskWithProtectedGetters task = new TaskWithProtectedGetters(0, maxRetries);
201+
202+
assertThat(task.accessEthContext()).isEqualTo(ethContext);
203+
assertThat(task.accessMetricsSystem()).isEqualTo(metricsSystem);
204+
}
205+
77206
private class TaskThatFailsSometimes extends AbstractRetryingPeerTask<Boolean> {
78207
final int initialFailures;
79208
int executions = 0;
@@ -96,4 +225,112 @@ protected CompletableFuture<Boolean> executePeerTask(final Optional<EthPeer> ass
96225
}
97226
}
98227
}
228+
229+
private class TaskThatAcceptsAllPeers extends AbstractRetryingPeerTask<Boolean> {
230+
final int initialFailures;
231+
int failures = 0;
232+
233+
protected TaskThatAcceptsAllPeers(final int initialFailures, final int maxRetries) {
234+
super(ethContext, maxRetries, Objects::isNull, metricsSystem);
235+
this.initialFailures = initialFailures;
236+
}
237+
238+
@Override
239+
protected CompletableFuture<Boolean> executePeerTask(final Optional<EthPeer> assignedPeer) {
240+
if (failures < initialFailures) {
241+
failures++;
242+
return CompletableFuture.completedFuture(null);
243+
} else {
244+
result.complete(Boolean.TRUE);
245+
return CompletableFuture.completedFuture(Boolean.TRUE);
246+
}
247+
}
248+
}
249+
250+
private class TaskWithPeerFilter extends AbstractRetryingPeerTask<Boolean> {
251+
final int initialFailures;
252+
final long minChainHeight;
253+
int failures = 0;
254+
255+
protected TaskWithPeerFilter(
256+
final int initialFailures, final int maxRetries, final long minChainHeight) {
257+
super(ethContext, maxRetries, Objects::isNull, metricsSystem);
258+
this.initialFailures = initialFailures;
259+
this.minChainHeight = minChainHeight;
260+
}
261+
262+
@Override
263+
protected boolean isSuitablePeer(final EthPeerImmutableAttributes peer) {
264+
return peer.hasEstimatedChainHeight() && peer.estimatedChainHeight() >= minChainHeight;
265+
}
266+
267+
@Override
268+
protected CompletableFuture<Boolean> executePeerTask(final Optional<EthPeer> assignedPeer) {
269+
if (failures < initialFailures) {
270+
failures++;
271+
return CompletableFuture.completedFuture(null);
272+
} else {
273+
result.complete(Boolean.TRUE);
274+
return CompletableFuture.completedFuture(Boolean.TRUE);
275+
}
276+
}
277+
}
278+
279+
private class TaskWithPartialSuccess extends AbstractRetryingPeerTask<String> {
280+
@SuppressWarnings("UnusedVariable")
281+
int executions = 0;
282+
283+
int cycle = 0;
284+
285+
protected TaskWithPartialSuccess(final int maxRetries) {
286+
super(ethContext, maxRetries, Objects::isNull, metricsSystem);
287+
}
288+
289+
@Override
290+
protected CompletableFuture<String> executePeerTask(final Optional<EthPeer> assignedPeer) {
291+
executions++;
292+
cycle++;
293+
294+
// Pattern: fail, partial success, fail, partial success, full success
295+
if (cycle % 5 == 0) {
296+
result.complete("success");
297+
return CompletableFuture.completedFuture("success");
298+
} else if (cycle % 2 == 0) {
299+
// Partial success (non-null but not complete)
300+
return CompletableFuture.completedFuture("partial");
301+
} else {
302+
// Null response (triggers retry)
303+
return CompletableFuture.completedFuture(null);
304+
}
305+
}
306+
}
307+
308+
private class TaskWithProtectedGetters extends AbstractRetryingPeerTask<Boolean> {
309+
final int initialFailures;
310+
int failures = 0;
311+
312+
protected TaskWithProtectedGetters(final int initialFailures, final int maxRetries) {
313+
super(ethContext, maxRetries, Objects::isNull, metricsSystem);
314+
this.initialFailures = initialFailures;
315+
}
316+
317+
public EthContext accessEthContext() {
318+
return getEthContext();
319+
}
320+
321+
public MetricsSystem accessMetricsSystem() {
322+
return getMetricsSystem();
323+
}
324+
325+
@Override
326+
protected CompletableFuture<Boolean> executePeerTask(final Optional<EthPeer> assignedPeer) {
327+
if (failures < initialFailures) {
328+
failures++;
329+
return CompletableFuture.completedFuture(null);
330+
} else {
331+
result.complete(Boolean.TRUE);
332+
return CompletableFuture.completedFuture(Boolean.TRUE);
333+
}
334+
}
335+
}
99336
}

0 commit comments

Comments
 (0)