Skip to content

Commit 1bf859b

Browse files
Remove Peer Task System feature toggle from DownloadBodiesStep (besu-eth#9952)
* Remove Peer Task System feature toggle from DownloadBodiesStep Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * Fix FullSyncChainDownloaderForkTest Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * Remove invalid test Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * spotless Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * Fix FullSyncChainDownloaderTotalTerminalDifficultyTest Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * Remove unneeded old code Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> * Fix infinite loop in CompleteBlocksWithPeerTask and add unit test Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net> --------- Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
1 parent 67304da commit 1bf859b

File tree

9 files changed

+71
-485
lines changed

9 files changed

+71
-485
lines changed

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717
import org.hyperledger.besu.ethereum.core.Block;
1818
import org.hyperledger.besu.ethereum.core.BlockHeader;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
20-
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask;
2120
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask;
2221
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
23-
import org.hyperledger.besu.plugin.services.MetricsSystem;
2422

2523
import java.util.List;
2624
import java.util.concurrent.CompletableFuture;
@@ -31,39 +29,23 @@ public class DownloadBodiesStep
3129

3230
private final ProtocolSchedule protocolSchedule;
3331
private final EthContext ethContext;
34-
private final MetricsSystem metricsSystem;
35-
private final SynchronizerConfiguration synchronizerConfiguration;
3632

37-
public DownloadBodiesStep(
38-
final ProtocolSchedule protocolSchedule,
39-
final EthContext ethContext,
40-
final SynchronizerConfiguration synchronizerConfiguration,
41-
final MetricsSystem metricsSystem) {
33+
public DownloadBodiesStep(final ProtocolSchedule protocolSchedule, final EthContext ethContext) {
4234
this.protocolSchedule = protocolSchedule;
4335
this.ethContext = ethContext;
44-
this.synchronizerConfiguration = synchronizerConfiguration;
45-
this.metricsSystem = metricsSystem;
4636
}
4737

4838
@Override
4939
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
50-
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
51-
return ethContext
52-
.getScheduler()
53-
.scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders));
54-
} else {
55-
return CompleteBlocksTask.forHeaders(
56-
protocolSchedule, ethContext, blockHeaders, metricsSystem)
57-
.run();
58-
}
59-
}
60-
61-
private CompletableFuture<List<Block>> getBodiesWithPeerTaskSystem(
62-
final List<BlockHeader> headers) {
63-
64-
final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
65-
new CompleteBlocksWithPeerTask(protocolSchedule, headers, ethContext.getPeerTaskExecutor());
66-
final List<Block> blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers();
67-
return CompletableFuture.completedFuture(blocks);
40+
return ethContext
41+
.getScheduler()
42+
.scheduleServiceTask(
43+
() -> {
44+
final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
45+
new CompleteBlocksWithPeerTask(
46+
protocolSchedule, blockHeaders, ethContext.getPeerTaskExecutor());
47+
final List<Block> blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers();
48+
return CompletableFuture.completedFuture(blocks);
49+
});
6850
}
6951
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(
106106
metricsSystem);
107107
final RangeHeadersValidationStep validateHeadersJoinUpStep = new RangeHeadersValidationStep();
108108
final DownloadBodiesStep downloadBodiesStep =
109-
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
109+
new DownloadBodiesStep(protocolSchedule, ethContext);
110110
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();
111111
final FullImportBlockStep importBlockStep =
112112
new FullImportBlockStep(

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java

Lines changed: 0 additions & 104 deletions
This file was deleted.

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ public List<Block> retrieveBlocksFromPeers() {
119119
headersToGet.removeFirst();
120120
nextIndex = findNextIndex(nextIndex + 1);
121121
});
122+
} else {
123+
throw new RuntimeException(
124+
"Unable to retrieve blocks for block numbers: "
125+
+ headersToGet.getFirst().getNumber()
126+
+ " to "
127+
+ headersToGet.getLast().getNumber());
122128
}
123129
}
124130
return List.of(result);

ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package org.hyperledger.besu.ethereum.eth.sync.fullsync;
1616

1717
import static org.assertj.core.api.Assertions.assertThat;
18-
import static org.mockito.Mockito.mock;
1918

2019
import org.hyperledger.besu.ethereum.ProtocolContext;
2120
import org.hyperledger.besu.ethereum.chain.Blockchain;
@@ -29,6 +28,8 @@
2928
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
3029
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
3130
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
31+
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
32+
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTaskExecutorAnswer;
3233
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
3334
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
3435
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@@ -43,6 +44,7 @@
4344
import org.junit.jupiter.api.AfterEach;
4445
import org.junit.jupiter.api.BeforeEach;
4546
import org.junit.jupiter.api.Test;
47+
import org.mockito.Mockito;
4648

4749
public class FullSyncChainDownloaderForkTest {
4850

@@ -51,6 +53,7 @@ public class FullSyncChainDownloaderForkTest {
5153
protected EthContext ethContext;
5254
protected ProtocolContext protocolContext;
5355
private SyncState syncState;
56+
private PeerTaskExecutor peerTaskExecutor;
5457

5558
private BlockchainSetupUtil localBlockchainSetup;
5659
protected MutableBlockchain localBlockchain;
@@ -67,6 +70,7 @@ public void setupTest() throws IOException {
6770

6871
protocolSchedule = localBlockchainSetup.getProtocolSchedule();
6972
protocolContext = localBlockchainSetup.getProtocolContext();
73+
peerTaskExecutor = Mockito.mock(PeerTaskExecutor.class);
7074
ethProtocolManager =
7175
EthProtocolManagerTestBuilder.builder()
7276
.setProtocolSchedule(protocolSchedule)
@@ -75,9 +79,14 @@ public void setupTest() throws IOException {
7579
.setWorldStateArchive(localBlockchainSetup.getWorldArchive())
7680
.setTransactionPool(localBlockchainSetup.getTransactionPool())
7781
.setEthereumWireProtocolConfiguration(EthProtocolConfiguration.DEFAULT)
82+
.setPeerTaskExecutor(peerTaskExecutor)
7883
.build();
7984
ethContext = ethProtocolManager.ethContext();
8085
syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());
86+
87+
Mockito.when(peerTaskExecutor.execute(Mockito.any(GetBodiesFromPeerTask.class)))
88+
.thenAnswer(
89+
new GetBodiesFromPeerTaskExecutorAnswer(otherBlockchain, ethContext.getEthPeers()));
8190
}
8291

8392
@AfterEach
@@ -95,7 +104,7 @@ private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) {
95104
metricsSystem,
96105
SyncTerminationCondition.never(),
97106
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS,
98-
mock(PeerTaskExecutor.class));
107+
peerTaskExecutor);
99108
}
100109

101110
private ChainDownloader downloader() {

ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
4848
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
4949
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
50-
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
5150
import org.hyperledger.besu.metrics.SyncDurationMetrics;
5251
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
5352
import org.hyperledger.besu.plugin.services.MetricsSystem;
@@ -363,73 +362,6 @@ public void choosesBestPeerAsSyncTarget_byTdAndHeight(final DataStorageFormat st
363362
assertThat(syncState.syncTarget().get().peer()).isEqualTo(peerB.getEthPeer());
364363
}
365364

366-
@ParameterizedTest
367-
@ArgumentsSource(FullSyncChainDownloaderTestArguments.class)
368-
public void recoversFromSyncTargetDisconnect(final DataStorageFormat storageFormat) {
369-
setupTest(storageFormat);
370-
localBlockchainSetup.importFirstBlocks(2);
371-
final long localChainHeadAtStart = localBlockchain.getChainHeadBlockNumber();
372-
otherBlockchainSetup.importAllBlocks();
373-
final long targetBlock = otherBlockchain.getChainHeadBlockNumber();
374-
// Sanity check
375-
assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber());
376-
377-
final SynchronizerConfiguration syncConfig =
378-
syncConfigBuilder().downloaderChainSegmentSize(5).downloaderHeadersRequestSize(3).build();
379-
final ChainDownloader downloader = downloader(syncConfig);
380-
381-
final long bestPeerChainHead = otherBlockchain.getChainHeadBlockNumber();
382-
final RespondingEthPeer bestPeer =
383-
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
384-
final long secondBestPeerChainHead = bestPeerChainHead - 3;
385-
final Blockchain shorterChain = createShortChain(otherBlockchain, secondBestPeerChainHead);
386-
final RespondingEthPeer secondBestPeer =
387-
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, shorterChain);
388-
final RespondingEthPeer.Responder bestResponder =
389-
RespondingEthPeer.blockchainResponder(otherBlockchain);
390-
final RespondingEthPeer.Responder secondBestResponder =
391-
RespondingEthPeer.blockchainResponder(shorterChain);
392-
downloader.start();
393-
394-
// Process through sync target selection
395-
bestPeer.respondWhileOtherThreadsWork(bestResponder, () -> !syncState.syncTarget().isPresent());
396-
397-
assertThat(syncState.syncTarget()).isPresent();
398-
assertThat(syncState.syncTarget().get().peer()).isEqualTo(bestPeer.getEthPeer());
399-
400-
// The next message should be for checkpoint headers from the sync target
401-
Awaitility.waitAtMost(10, TimeUnit.SECONDS)
402-
.until(() -> bestPeer.peekNextOutgoingRequest().isPresent());
403-
404-
// Process through the first import
405-
await()
406-
.atMost(10, TimeUnit.SECONDS)
407-
.pollInterval(100, TimeUnit.MILLISECONDS)
408-
.untilAsserted(
409-
() -> {
410-
if (!bestPeer.respond(bestResponder)) {
411-
secondBestPeer.respond(secondBestResponder);
412-
}
413-
assertThat(localBlockchain.getChainHeadBlockNumber())
414-
.isNotEqualTo(localChainHeadAtStart);
415-
});
416-
417-
// Sanity check that we haven't already passed the second best peer
418-
assertThat(localBlockchain.getChainHeadBlockNumber()).isLessThan(secondBestPeerChainHead);
419-
420-
// Disconnect peer
421-
ethProtocolManager.handleDisconnect(
422-
bestPeer.getPeerConnection(), DisconnectReason.TOO_MANY_PEERS, true);
423-
424-
// Downloader should recover and sync to next best peer, but it may stall
425-
// for 10 seconds first (by design).
426-
secondBestPeer.respondWhileOtherThreadsWork(
427-
secondBestResponder,
428-
() -> localBlockchain.getChainHeadBlockNumber() != secondBestPeerChainHead);
429-
430-
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(secondBestPeerChainHead);
431-
}
432-
433365
@ParameterizedTest
434366
@ArgumentsSource(FullSyncChainDownloaderTestArguments.class)
435367
public void requestsCheckpointsFromSyncTarget(final DataStorageFormat storageFormat) {

ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
3131
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
3232
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
33+
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
34+
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTaskExecutorAnswer;
3335
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask;
3436
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTaskExecutorAnswer;
3537
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
@@ -109,6 +111,10 @@ public void setupTest(final DataStorageFormat storageFormat) {
109111
peerTaskExecutor.executeAgainstPeer(
110112
Mockito.any(GetHeadersFromPeerTask.class), Mockito.any(EthPeer.class)))
111113
.thenAnswer(headersAnswer);
114+
115+
Mockito.when(peerTaskExecutor.execute(Mockito.any(GetBodiesFromPeerTask.class)))
116+
.thenAnswer(
117+
new GetBodiesFromPeerTaskExecutorAnswer(otherBlockchain, ethContext.getEthPeers()));
112118
}
113119

114120
@AfterEach

0 commit comments

Comments
 (0)