Skip to content

Allow reorg during sync #9268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tech.pegasys.teku.beacon.sync.forward.ForwardSync;
import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService;
import tech.pegasys.teku.beacon.sync.forward.multipeer.MultipeerSyncService;
import tech.pegasys.teku.beacon.sync.forward.multipeer.SyncReorgManager;
import tech.pegasys.teku.beacon.sync.forward.singlepeer.SinglePeerSyncServiceFactory;
import tech.pegasys.teku.beacon.sync.gossip.blobs.RecentBlobSidecarsFetcher;
import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final PendingPool<ValidatableAttestation> pendingAttestations;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final SyncReorgManager syncReorgManager;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
private final Duration startupTimeout;
Expand All @@ -91,6 +93,7 @@ public DefaultSyncServiceFactory(
final PendingPool<SignedBeaconBlock> pendingBlocks,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final SyncReorgManager syncReorgManager,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
final Duration startupTimeout,
Expand All @@ -110,6 +113,7 @@ public DefaultSyncServiceFactory(
this.pendingBlocks = pendingBlocks;
this.pendingAttestations = pendingAttestations;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.syncReorgManager = syncReorgManager;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
this.startupTimeout = startupTimeout;
Expand Down Expand Up @@ -193,6 +197,7 @@ protected ForwardSyncService createForwardSyncService() {
blockImporter,
blobSidecarManager,
blockBlobSidecarsTrackersPool,
syncReorgManager,
syncConfig.getForwardSyncBatchSize(),
syncConfig.getForwardSyncMaxPendingBatches(),
syncConfig.getForwardSyncMaxBlocksPerMinute(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
Expand All @@ -44,6 +45,8 @@ public class BatchSync implements Sync {
private static final Logger LOG = LogManager.getLogger();
private static final Duration PAUSE_ON_SERVICE_OFFLINE = Duration.ofSeconds(5);

private final Subscribers<BlocksImportedSubscriber> subscribers = Subscribers.create(true);

private final EventThread eventThread;
private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
Expand Down Expand Up @@ -116,6 +119,11 @@ public static BatchSync create(
timeProvider);
}

@Override
public long subscribeToBlocksImportedEvent(final BlocksImportedSubscriber subscriber) {
return subscribers.subscribe(subscriber);
}

/**
* Begin a sync to the specified target chain. If a sync was previously in progress to a different
* chain, the sync will switch to this new chain.
Expand Down Expand Up @@ -404,6 +412,10 @@ private void onImportComplete(
isCurrentlyImportingBatch(importedBatch),
"Received import complete for batch that shouldn't have been importing");
importingBatch = Optional.empty();
importedBatch
.getLastBlock()
.ifPresent(
lastBlock -> subscribers.deliver(subscriber -> subscriber.onBlocksImported(lastBlock)));
if (switchingBranches) {
// We switched to a different chain while this was importing. Can't infer anything about other
// batches from this result but should still penalise the peer that sent it to us.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static MultipeerSyncService create(
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final SyncReorgManager syncReorgManager,
final int batchSize,
final int maxPendingBatches,
final int maxBlocksPerMinute,
Expand Down Expand Up @@ -117,6 +118,7 @@ eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()),
finalizedTargetChains,
nonfinalizedTargetChains,
spec.getSlotsPerEpoch(recentChainData.getCurrentSlot().orElse(UInt64.ZERO))),
syncReorgManager,
batchSync);
final PeerChainTracker peerChainTracker =
new PeerChainTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;

public interface Sync {
Expand All @@ -32,6 +33,12 @@ public interface Sync {

SafeFuture<Optional<SyncProgress>> getSyncProgress();

long subscribeToBlocksImportedEvent(BlocksImportedSubscriber subscriber);

interface BlocksImportedSubscriber {
void onBlocksImported(SignedBeaconBlock lastImportedBlock);
}

record SyncProgress(
UInt64 fromSlot,
UInt64 toSlot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.beacon.sync.events.SyncingStatus;
import tech.pegasys.teku.beacon.sync.forward.ForwardSync.SyncSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress;
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.storage.client.RecentChainData;

public class SyncController {
public class SyncController implements BlocksImportedSubscriber {
private static final Logger LOG = LogManager.getLogger();

private final Subscribers<SyncSubscriber> subscribers = Subscribers.create(true);
Expand All @@ -40,6 +42,7 @@ public class SyncController {
private final RecentChainData recentChainData;
private final SyncTargetSelector syncTargetSelector;
private final Sync sync;
private final SyncReorgManager syncReorgManager;

/**
* The current sync. When empty, no sync has started, otherwise contains the details of the last
Expand All @@ -55,12 +58,26 @@ public SyncController(
final Executor subscriberExecutor,
final RecentChainData recentChainData,
final SyncTargetSelector syncTargetSelector,
final SyncReorgManager syncReorgManager,
final Sync sync) {
this.eventThread = eventThread;
this.subscriberExecutor = subscriberExecutor;
this.recentChainData = recentChainData;
this.syncTargetSelector = syncTargetSelector;
this.syncReorgManager = syncReorgManager;
this.sync = sync;
sync.subscribeToBlocksImportedEvent(this);
}

@Override
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) {
eventThread.execute(
() -> {
if (isSyncSpeculative()) {
return;
}
syncReorgManager.onBlocksImported(lastImportedBlock);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.forward.multipeer;

import java.util.Optional;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger;
import tech.pegasys.teku.storage.client.ChainHead;
import tech.pegasys.teku.storage.client.RecentChainData;

public class SyncReorgManager implements BlocksImportedSubscriber {
static final int REORG_SLOT_THRESHOLD = 10;

private final RecentChainData recentChainData;
private final ForkChoiceTrigger forkChoiceTrigger;

public SyncReorgManager(
final RecentChainData recentChainData, final ForkChoiceTrigger forkChoiceTrigger) {
this.recentChainData = recentChainData;
this.forkChoiceTrigger = forkChoiceTrigger;
}

@Override
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) {

final Optional<ChainHead> currentHead = recentChainData.getChainHead();

if (currentHead.isEmpty()) {
return;
}

if (lastImportedBlock.getRoot().equals(currentHead.get().getRoot())) {
return;
}

if (currentHead
.get()
.getSlot()
.plus(REORG_SLOT_THRESHOLD)
.isGreaterThan(lastImportedBlock.getSlot())) {
return;
}

forkChoiceTrigger.reorgWhileSyncing(currentHead.get().getRoot(), lastImportedBlock.getRoot());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress;
import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch;
import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.StubBatchFactory;
Expand Down Expand Up @@ -658,6 +659,27 @@ void shouldRemoveBatchFromActiveSetWhenImportCompletesSuccessfully() {
assertBatchNotActive(batch0);
}

@Test
void shouldNotifyOnBlocksImported() {
assertThat(sync.syncToChain(targetChain)).isNotDone();
final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class);

sync.subscribeToBlocksImportedEvent(subscriber);

final Batch batch0 = batches.get(0);
final Batch batch1 = batches.get(1);
batches.receiveBlocks(batch0, chainBuilder.generateBlockAtSlot(1).getBlock());
batches.receiveBlocks(
batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock());

assertBatchImported(batch0);
verifyNoInteractions(subscriber);
batches.getImportResult(batch0).complete(IMPORTED_ALL_BLOCKS);

verify(subscriber).onBlocksImported(batch0.getLastBlock().orElseThrow());
verifyNoMoreInteractions(subscriber);
}

@Test
void shouldSwitchChains() {
// Start sync to first chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChainTestUtil.chainWith;
Expand All @@ -37,6 +38,7 @@
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.RecentChainData;

Expand All @@ -48,17 +50,24 @@ class SyncControllerTest {
private final SyncTargetSelector syncTargetSelector = mock(SyncTargetSelector.class);
private final RecentChainData recentChainData = mock(RecentChainData.class);
private final Executor subscriberExecutor = mock(Executor.class);
private final SyncReorgManager syncReorgManager = mock(SyncReorgManager.class);

private final TargetChain targetChain = chainWith(dataStructureUtil.randomSlotAndBlockRoot());

private final SyncController syncController =
new SyncController(
eventThread, subscriberExecutor, recentChainData, syncTargetSelector, sync);
eventThread,
subscriberExecutor,
recentChainData,
syncTargetSelector,
syncReorgManager,
sync);
private static final UInt64 HEAD_SLOT = UInt64.valueOf(2338);

@BeforeEach
void setUp() {
when(recentChainData.getHeadSlot()).thenReturn(HEAD_SLOT);
verify(sync).subscribeToBlocksImportedEvent(any());
}

@Test
Expand Down Expand Up @@ -242,6 +251,36 @@ void shouldNotNotifySubscribersWhenRunningSpeculativeTarget() {
verify(subscriberExecutor, never()).execute(any());
}

@Test
void shouldForwardOnBlocksImportedWhenNonSpeculativeSync() {
final SafeFuture<SyncResult> syncResult = new SafeFuture<>();
when(syncTargetSelector.selectSyncTarget(any()))
.thenReturn(Optional.of(SyncTarget.speculativeTarget(targetChain)));
when(sync.syncToChain(targetChain)).thenReturn(syncResult);

onTargetChainsUpdated();

syncController.onBlocksImported(dataStructureUtil.randomSignedBeaconBlock());

verifyNoInteractions(syncReorgManager);
}

@Test
void shouldForwardOnBlocksImported() {
when(syncTargetSelector.selectSyncTarget(Optional.empty()))
.thenReturn(Optional.of(SyncTarget.nonfinalizedTarget(targetChain)));

when(sync.syncToChain(targetChain)).thenReturn(new SafeFuture<>());

onTargetChainsUpdated();

final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock();

syncController.onBlocksImported(block);

verify(syncReorgManager).onBlocksImported(block);
}

private void assertSyncSubscriberNotified(
final SyncSubscriber subscriber, final boolean syncing) {
// Shouldn't notify on the event thread
Expand Down
Loading