diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index 80be79f806c..f1fbd29fe8e 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -25,6 +25,7 @@ import tech.pegasys.teku.beacon.sync.forward.ForwardSync; import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService; import tech.pegasys.teku.beacon.sync.forward.multipeer.MultipeerSyncService; +import tech.pegasys.teku.beacon.sync.forward.multipeer.SyncReorgManager; import tech.pegasys.teku.beacon.sync.forward.singlepeer.SinglePeerSyncServiceFactory; import tech.pegasys.teku.beacon.sync.gossip.blobs.RecentBlobSidecarsFetcher; import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService; @@ -70,6 +71,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory { private final PendingPool pendingBlocks; private final PendingPool pendingAttestations; private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; + private final SyncReorgManager syncReorgManager; private final int getStartupTargetPeerCount; private final AsyncBLSSignatureVerifier signatureVerifier; private final Duration startupTimeout; @@ -91,6 +93,7 @@ public DefaultSyncServiceFactory( final PendingPool pendingBlocks, final PendingPool pendingAttestations, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, + final SyncReorgManager syncReorgManager, final int getStartupTargetPeerCount, final SignatureVerificationService signatureVerifier, final Duration startupTimeout, @@ -110,6 +113,7 @@ public DefaultSyncServiceFactory( this.pendingBlocks = pendingBlocks; this.pendingAttestations = pendingAttestations; this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; + this.syncReorgManager = syncReorgManager; this.getStartupTargetPeerCount = getStartupTargetPeerCount; this.signatureVerifier = signatureVerifier; this.startupTimeout = startupTimeout; @@ -193,6 +197,7 @@ protected ForwardSyncService createForwardSyncService() { blockImporter, blobSidecarManager, blockBlobSidecarsTrackersPool, + syncReorgManager, syncConfig.getForwardSyncBatchSize(), syncConfig.getForwardSyncMaxPendingBatches(), syncConfig.getForwardSyncMaxBlocksPerMinute(), diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java index 9736fbc5432..a05849558a0 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java @@ -33,6 +33,7 @@ import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; +import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary; @@ -44,6 +45,8 @@ public class BatchSync implements Sync { private static final Logger LOG = LogManager.getLogger(); private static final Duration PAUSE_ON_SERVICE_OFFLINE = Duration.ofSeconds(5); + private final Subscribers subscribers = Subscribers.create(true); + private final EventThread eventThread; private final AsyncRunner asyncRunner; private final RecentChainData recentChainData; @@ -116,6 +119,11 @@ public static BatchSync create( timeProvider); } + @Override + public long subscribeToBlocksImportedEvent(final BlocksImportedSubscriber subscriber) { + return subscribers.subscribe(subscriber); + } + /** * Begin a sync to the specified target chain. If a sync was previously in progress to a different * chain, the sync will switch to this new chain. @@ -404,6 +412,10 @@ private void onImportComplete( isCurrentlyImportingBatch(importedBatch), "Received import complete for batch that shouldn't have been importing"); importingBatch = Optional.empty(); + importedBatch + .getLastBlock() + .ifPresent( + lastBlock -> subscribers.deliver(subscriber -> subscriber.onBlocksImported(lastBlock))); if (switchingBranches) { // We switched to a different chain while this was importing. Can't infer anything about other // batches from this result but should still penalise the peer that sent it to us. diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java index 9d85fa03bed..da2780d1b2d 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java @@ -76,6 +76,7 @@ public static MultipeerSyncService create( final BlockImporter blockImporter, final BlobSidecarManager blobSidecarManager, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, + final SyncReorgManager syncReorgManager, final int batchSize, final int maxPendingBatches, final int maxBlocksPerMinute, @@ -117,6 +118,7 @@ eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()), finalizedTargetChains, nonfinalizedTargetChains, spec.getSlotsPerEpoch(recentChainData.getCurrentSlot().orElse(UInt64.ZERO))), + syncReorgManager, batchSync); final PeerChainTracker peerChainTracker = new PeerChainTracker( diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java index a465bb58f3c..5d97a3809fc 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java @@ -17,6 +17,7 @@ import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; public interface Sync { @@ -32,6 +33,12 @@ public interface Sync { SafeFuture> getSyncProgress(); + long subscribeToBlocksImportedEvent(BlocksImportedSubscriber subscriber); + + interface BlocksImportedSubscriber { + void onBlocksImported(SignedBeaconBlock lastImportedBlock); + } + record SyncProgress( UInt64 fromSlot, UInt64 toSlot, diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java index 8abd3fa1625..9ef1db8e4d6 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import tech.pegasys.teku.beacon.sync.events.SyncingStatus; import tech.pegasys.teku.beacon.sync.forward.ForwardSync.SyncSubscriber; +import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -28,9 +29,10 @@ import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.storage.client.RecentChainData; -public class SyncController { +public class SyncController implements BlocksImportedSubscriber { private static final Logger LOG = LogManager.getLogger(); private final Subscribers subscribers = Subscribers.create(true); @@ -40,6 +42,7 @@ public class SyncController { private final RecentChainData recentChainData; private final SyncTargetSelector syncTargetSelector; private final Sync sync; + private final SyncReorgManager syncReorgManager; /** * The current sync. When empty, no sync has started, otherwise contains the details of the last @@ -55,12 +58,26 @@ public SyncController( final Executor subscriberExecutor, final RecentChainData recentChainData, final SyncTargetSelector syncTargetSelector, + final SyncReorgManager syncReorgManager, final Sync sync) { this.eventThread = eventThread; this.subscriberExecutor = subscriberExecutor; this.recentChainData = recentChainData; this.syncTargetSelector = syncTargetSelector; + this.syncReorgManager = syncReorgManager; this.sync = sync; + sync.subscribeToBlocksImportedEvent(this); + } + + @Override + public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) { + eventThread.execute( + () -> { + if (isSyncSpeculative()) { + return; + } + syncReorgManager.onBlocksImported(lastImportedBlock); + }); } /** diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java new file mode 100644 index 00000000000..f0f4f1659b9 --- /dev/null +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java @@ -0,0 +1,58 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beacon.sync.forward.multipeer; + +import java.util.Optional; +import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; +import tech.pegasys.teku.storage.client.ChainHead; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class SyncReorgManager implements BlocksImportedSubscriber { + static final int REORG_SLOT_THRESHOLD = 10; + + private final RecentChainData recentChainData; + private final ForkChoiceTrigger forkChoiceTrigger; + + public SyncReorgManager( + final RecentChainData recentChainData, final ForkChoiceTrigger forkChoiceTrigger) { + this.recentChainData = recentChainData; + this.forkChoiceTrigger = forkChoiceTrigger; + } + + @Override + public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) { + + final Optional currentHead = recentChainData.getChainHead(); + + if (currentHead.isEmpty()) { + return; + } + + if (lastImportedBlock.getRoot().equals(currentHead.get().getRoot())) { + return; + } + + if (currentHead + .get() + .getSlot() + .plus(REORG_SLOT_THRESHOLD) + .isGreaterThan(lastImportedBlock.getSlot())) { + return; + } + + forkChoiceTrigger.reorgWhileSyncing(currentHead.get().getRoot(), lastImportedBlock.getRoot()); + } +} diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java index cb5e32fb386..350aac52d21 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java @@ -34,6 +34,7 @@ import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress; import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch; import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.StubBatchFactory; @@ -658,6 +659,27 @@ void shouldRemoveBatchFromActiveSetWhenImportCompletesSuccessfully() { assertBatchNotActive(batch0); } + @Test + void shouldNotifyOnBlocksImported() { + assertThat(sync.syncToChain(targetChain)).isNotDone(); + final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class); + + sync.subscribeToBlocksImportedEvent(subscriber); + + final Batch batch0 = batches.get(0); + final Batch batch1 = batches.get(1); + batches.receiveBlocks(batch0, chainBuilder.generateBlockAtSlot(1).getBlock()); + batches.receiveBlocks( + batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock()); + + assertBatchImported(batch0); + verifyNoInteractions(subscriber); + batches.getImportResult(batch0).complete(IMPORTED_ALL_BLOCKS); + + verify(subscriber).onBlocksImported(batch0.getLastBlock().orElseThrow()); + verifyNoMoreInteractions(subscriber); + } + @Test void shouldSwitchChains() { // Start sync to first chain diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java index 16ceda2c2b8..3985c8c4a8f 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChainTestUtil.chainWith; @@ -37,6 +38,7 @@ import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.storage.client.RecentChainData; @@ -48,17 +50,24 @@ class SyncControllerTest { private final SyncTargetSelector syncTargetSelector = mock(SyncTargetSelector.class); private final RecentChainData recentChainData = mock(RecentChainData.class); private final Executor subscriberExecutor = mock(Executor.class); + private final SyncReorgManager syncReorgManager = mock(SyncReorgManager.class); private final TargetChain targetChain = chainWith(dataStructureUtil.randomSlotAndBlockRoot()); private final SyncController syncController = new SyncController( - eventThread, subscriberExecutor, recentChainData, syncTargetSelector, sync); + eventThread, + subscriberExecutor, + recentChainData, + syncTargetSelector, + syncReorgManager, + sync); private static final UInt64 HEAD_SLOT = UInt64.valueOf(2338); @BeforeEach void setUp() { when(recentChainData.getHeadSlot()).thenReturn(HEAD_SLOT); + verify(sync).subscribeToBlocksImportedEvent(any()); } @Test @@ -242,6 +251,36 @@ void shouldNotNotifySubscribersWhenRunningSpeculativeTarget() { verify(subscriberExecutor, never()).execute(any()); } + @Test + void shouldForwardOnBlocksImportedWhenNonSpeculativeSync() { + final SafeFuture syncResult = new SafeFuture<>(); + when(syncTargetSelector.selectSyncTarget(any())) + .thenReturn(Optional.of(SyncTarget.speculativeTarget(targetChain))); + when(sync.syncToChain(targetChain)).thenReturn(syncResult); + + onTargetChainsUpdated(); + + syncController.onBlocksImported(dataStructureUtil.randomSignedBeaconBlock()); + + verifyNoInteractions(syncReorgManager); + } + + @Test + void shouldForwardOnBlocksImported() { + when(syncTargetSelector.selectSyncTarget(Optional.empty())) + .thenReturn(Optional.of(SyncTarget.nonfinalizedTarget(targetChain))); + + when(sync.syncToChain(targetChain)).thenReturn(new SafeFuture<>()); + + onTargetChainsUpdated(); + + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); + + syncController.onBlocksImported(block); + + verify(syncReorgManager).onBlocksImported(block); + } + private void assertSyncSubscriberNotified( final SyncSubscriber subscriber, final boolean syncing) { // Shouldn't notify on the event thread diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManagerTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManagerTest.java new file mode 100644 index 00000000000..c4dbb7e8ba2 --- /dev/null +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManagerTest.java @@ -0,0 +1,81 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beacon.sync.forward.multipeer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.Optional; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; +import tech.pegasys.teku.storage.client.ChainHead; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class SyncReorgManagerTest { + private final Spec spec = TestSpecFactory.createMinimalPhase0(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + private final RecentChainData recentChainData = mock(RecentChainData.class); + private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + + private final SyncReorgManager syncReorgManager = + new SyncReorgManager(recentChainData, forkChoiceTrigger); + + @Test + public void onBlocksImported_shouldDoNothingIfNoCurrentHead() { + when(recentChainData.getChainHead()).thenReturn(Optional.empty()); + syncReorgManager.onBlocksImported(dataStructureUtil.randomSignedBeaconBlock()); + + verifyNoInteractions(forkChoiceTrigger); + } + + @Test + public void onBlocksImported_shouldDoNothingIfLastImportedBlockIsCurrentHead() { + final SignedBlockAndState headBlock = dataStructureUtil.randomSignedBlockAndState(UInt64.ONE); + when(recentChainData.getChainHead()).thenReturn(Optional.of(ChainHead.create(headBlock))); + syncReorgManager.onBlocksImported(headBlock.getBlock()); + + verifyNoInteractions(forkChoiceTrigger); + } + + @Test + public void onBlocksImported_shouldDoNothingIfLastImportedBlockIsWithinReorgThreshold() { + final SignedBlockAndState headBlock = dataStructureUtil.randomSignedBlockAndState(UInt64.ONE); + final SignedBeaconBlock lastImportedBlock = + dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(10)); + when(recentChainData.getChainHead()).thenReturn(Optional.of(ChainHead.create(headBlock))); + syncReorgManager.onBlocksImported(lastImportedBlock); + + verifyNoInteractions(forkChoiceTrigger); + } + + @Test + public void onBlocksImported_shouldTriggerReorgWhenLastImportedBlockIsOutsideReorgThreshold() { + final SignedBlockAndState headBlock = dataStructureUtil.randomSignedBlockAndState(UInt64.ONE); + final SignedBeaconBlock lastImportedBlock = + dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(11)); + when(recentChainData.getChainHead()).thenReturn(Optional.of(ChainHead.create(headBlock))); + syncReorgManager.onBlocksImported(lastImportedBlock); + + verify(forkChoiceTrigger).reorgWhileSyncing(headBlock.getRoot(), lastImportedBlock.getRoot()); + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java index 678d98301a4..dbe4902d27c 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java @@ -314,6 +314,22 @@ public void onTick( performanceRecord.ifPresent(TickProcessingPerformance::deferredAttestationsApplied); } + public void reorgWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { + onForkChoiceThread( + () -> { + final ForkChoiceStrategy forkChoiceStrategy = getForkChoiceStrategy(); + final Optional commonAncestor = + forkChoiceStrategy.findCommonAncestor(oldHeadRoot, newHeadRoot); + if (commonAncestor.isEmpty()) { + return; + } + + forkChoiceStrategy.reorgWhileSyncing( + oldHeadRoot, newHeadRoot, commonAncestor.get().getBlockRoot()); + }) + .ifExceptionGetsHereRaiseABug(); + } + private void initializeProtoArrayForkChoice() { processHead().join(); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java index 204dfaa1e35..691da3b08b1 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.statetransition.forkchoice; +import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -47,4 +48,8 @@ public boolean isForkChoiceOverrideLateBlockEnabled() { public SafeFuture prepareForAttestationProduction(final UInt64 slot) { return forkChoiceRatchet.ensureForkChoiceCompleteForSlot(slot); } + + public void reorgWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { + forkChoice.reorgWhileSyncing(oldHeadRoot, newHeadRoot); + } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index e494a2aa04b..c3f5260f9b9 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -48,6 +48,7 @@ import tech.pegasys.teku.beacon.sync.SyncService; import tech.pegasys.teku.beacon.sync.SyncServiceFactory; import tech.pegasys.teku.beacon.sync.events.CoalescingChainHeadChannel; +import tech.pegasys.teku.beacon.sync.forward.multipeer.SyncReorgManager; import tech.pegasys.teku.beacon.sync.gossip.blobs.RecentBlobSidecarsFetcher; import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetcher; import tech.pegasys.teku.beaconrestapi.BeaconRestApi; @@ -1304,6 +1305,7 @@ protected SyncServiceFactory createSyncServiceFactory() { pendingBlocks, pendingAttestations, blockBlobSidecarsTrackersPool, + new SyncReorgManager(recentChainData, forkChoiceTrigger), beaconConfig.eth2NetworkConfig().getStartupTargetPeerCount(), signatureVerificationService, Duration.ofSeconds(beaconConfig.eth2NetworkConfig().getStartupTimeoutSeconds()), diff --git a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java index 019f774db91..37c87afb063 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java @@ -136,6 +136,16 @@ public Bytes32 applyPendingVotes( } } + public void reorgWhileSyncing( + final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot, final Bytes32 commonAncestorRoot) { + protoArrayLock.writeLock().lock(); + try { + protoArray.reorgWhileSyncing(oldHeadRoot, newHeadRoot, commonAncestorRoot); + } finally { + protoArrayLock.writeLock().unlock(); + } + } + public void onAttestation(final VoteUpdater voteUpdater, final IndexedAttestation attestation) { votesLock.writeLock().lock(); try { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java index 0eb64fbacb7..754410841a4 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java @@ -189,6 +189,60 @@ public void setInitialCanonicalBlockRoot(final Bytes32 initialCanonicalBlockRoot applyToNodes(this::updateBestDescendantOfParent); } + /** + * This function is supposed to be called while syncing. It assumes the protoarray to be in a + * state where all nodes have been just initialized (see {@link #setInitialCanonicalBlockRoot}) + * and have a weight of 1 on the canonical chain and 0 on all other chains. + * + *

It may also do the reorg if we enter syncing mode and, while syncing, we decide to sync to a + * new chain so that the common ancestor has not received any votes yet. + * + *

The function set all weights to 0 to the reorged chain and set all weights to 1 to the new + * chain. + */ + public void reorgWhileSyncing( + final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot, final Bytes32 commonAncestorRoot) { + final Optional oldHead = getProtoNode(oldHeadRoot); + final Optional newHead = getProtoNode(newHeadRoot); + final Optional commonAncestor = getProtoNode(commonAncestorRoot); + + if (oldHead.isEmpty() || newHead.isEmpty() || commonAncestor.isEmpty()) { + return; + } + + final ProtoNode oldHeadNode = oldHead.get(); + final ProtoNode newHeadNode = newHead.get(); + final ProtoNode commonAncestorNode = commonAncestor.get(); + + // check we are in syncing mode where all nodes have a max weight of 1 + if (commonAncestorNode.getWeight().isGreaterThan(1)) { + return; + } + + // let's set weight to 0 for all nodes from old head to common ancestor + ProtoNode node = oldHeadNode; + while (!node.getBlockRoot().equals(commonAncestorRoot)) { + node.adjustWeight(-node.getWeight().longValue()); + if (node.getParentIndex().isEmpty()) { + break; + } + node = getNodeByIndex(node.getParentIndex().get()); + } + + // let's set weight to 1 from the best descendant of the new head up to the first non-zero + // weight node + node = newHeadNode.getBestDescendantIndex().map(this::getNodeByIndex).orElse(newHeadNode); + while (node.getWeight().isZero()) { + node.adjustWeight(1); + if (node.getParentIndex().isEmpty()) { + break; + } + node = getNodeByIndex(node.getParentIndex().get()); + } + + applyToNodes(this::updateBestDescendantOfParent); + } + /** * Follows the best-descendant links to find the best-block (i.e. head-block), including any * optimistic nodes which have not yet been fully validated. diff --git a/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java b/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java index 9fbd42cd06b..122f55a2206 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; +import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; import java.util.Collections; import java.util.List; @@ -552,6 +553,67 @@ void setInitialCanonicalBlockRoot_shouldEnsureCanonicalHeadIsSetWhenBlockRootIsN assertHead(block2a); } + @Test + void reorgWhileSyncing_shouldReorgIfWeightsAreCompatibleWithSyncingModeWhenNotInitialized() { + final Bytes32 block1 = dataStructureUtil.randomBytes32(); + addValidBlock(1, block1, GENESIS_CHECKPOINT.getRoot()); + addValidBlock(2, block2a, block1); + addValidBlock(2, block2b, block1); + addValidBlock(3, block3a, block2a); + addValidBlock(4, block4a, block3a); + + protoArray.applyScoreChanges(computeDeltas(), ZERO, GENESIS_CHECKPOINT, GENESIS_CHECKPOINT); + + // due to tie breaking block4a is the head + assertHead(block4a); + + // setting chain a as the canonical chain via non-tip block + protoArray.reorgWhileSyncing(block4a, block2b, block1); + + // block2a is now the head due to weight + assertHead(block2b); + } + + @Test + void reorgWhileSyncing_shouldReorgIfWeightsAreCompatibleWithSyncingModeWhenInitialized() { + final Bytes32 block1 = dataStructureUtil.randomBytes32(); + addValidBlock(1, block1, GENESIS_CHECKPOINT.getRoot()); + addValidBlock(2, block2a, block1); + addValidBlock(2, block2b, block1); + addValidBlock(3, block3a, block2a); + addValidBlock(4, block4a, block3a); + + protoArray.setInitialCanonicalBlockRoot(block2b); + + assertHead(block2b); + + protoArray.reorgWhileSyncing(block2b, block4a, GENESIS_CHECKPOINT.getRoot()); + + assertHead(block4a); + } + + @Test + void reorgWhileSyncing_shouldNotReorgIfWeightsAreNotCompatibleWithSyncingMode() { + final Bytes32 block1 = dataStructureUtil.randomBytes32(); + addValidBlock(1, block1, GENESIS_CHECKPOINT.getRoot()); + addValidBlock(2, block2a, block1); + addValidBlock(2, block2b, block1); + addValidBlock(3, block3a, block2a); + addValidBlock(4, block4a, block3a); + + // let's add 5 votes to block4a tip, so common ancestor at block1 will accumulate 5 votes too + protoArray.applyScoreChanges( + LongArrayList.of(0, 0, 0, 0, 0, 5), ZERO, GENESIS_CHECKPOINT, GENESIS_CHECKPOINT); + + assertHead(block4a); + assertThat(protoArray.getProtoNode(block1).orElseThrow().getWeight()) + .isEqualTo(UInt64.valueOf(5)); + + protoArray.reorgWhileSyncing(block4a, block2b, block1); + + assertHead(block4a); + } + private void assertHead(final Bytes32 expectedBlockHash) { final ProtoNode node = protoArray.getProtoNode(expectedBlockHash).orElseThrow(); assertThat(