Skip to content

Commit 4dc802f

Browse files
domiweiclaude
andauthored
cl/caplin: fix intermittent 0-peer sync failures on Gnosis (#19946)
## Summary Fixes #19858 The Gnosis tip-tracking test intermittently fails because the stage pipeline enters sync stages before peer discovery completes (Gnosis has 5s slots vs mainnet 12s, narrowing the race window). - **Add `WaitForPeers` stage**: 1s retry loop when peers=0, avoids entering ForwardSync/ChainTipSync prematurely without sleeping a full slot - **Fix tight retry loop**: `fetchBlocksFromReqResp` now checks context cancellation and backs off 2s on `ErrNoPeers` - **Don't ban empty peerId**: `BackwardBeaconDownloader.sendBlockRequest` no longer calls `BanPeer("")` on `ErrNoPeers` - **Improve observability**: `ErrNoPeers` log level raised from Trace to Debug ## Test plan - [x] `go build ./cl/phase1/stages/ ./cl/phase1/network/` — compiles clean - [x] Dispatch `qa-tip-tracking-gnosis` workflow on this branch to verify fix https://github.com/erigontech/erigon/actions/runs/23181300043 - [ ] Monitor that Debug logs appear when peers are temporarily unavailable Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2d8450f commit 4dc802f

4 files changed

Lines changed: 56 additions & 3 deletions

File tree

cl/phase1/network/backward_beacon_downloader.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package network
1818

1919
import (
2020
"context"
21+
"errors"
2122
"math"
2223
"sync"
2324
"sync/atomic"
@@ -28,6 +29,7 @@ import (
2829
"github.com/erigontech/erigon/cl/persistence/beacon_indicies"
2930
"github.com/erigontech/erigon/cl/phase1/execution_client"
3031
"github.com/erigontech/erigon/cl/rpc"
32+
"github.com/erigontech/erigon/cl/sentinel/peers"
3133
"github.com/erigontech/erigon/common"
3234
"github.com/erigontech/erigon/common/log/v3"
3335
"github.com/erigontech/erigon/db/kv"
@@ -186,7 +188,17 @@ func (b *BackwardBeaconDownloader) sendBlockRequest(
186188
requestSent *atomic.Bool,
187189
) {
188190
blocks, peerId, err := b.rpc.SendBeaconBlocksByRangeReq(ctx, start, count)
189-
if err != nil || blocks == nil || len(blocks) == 0 {
191+
if err != nil {
192+
// Don't ban when the error is due to no peers being available.
193+
if !errors.Is(err, peers.ErrNoPeers) {
194+
b.rpc.BanPeer(peerId)
195+
} else {
196+
log.Debug("[Caplin] no peers available for backward beacon block request", "start", start, "count", count)
197+
}
198+
requestSent.Store(false)
199+
return
200+
}
201+
if blocks == nil || len(blocks) == 0 {
190202
b.rpc.BanPeer(peerId)
191203
requestSent.Store(false)
192204
return

cl/phase1/network/beacon_downloader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ Loop:
113113
responses, peerId, err := f.rpc.SendBeaconBlocksByRangeReq(ctx, reqSlot, reqCount)
114114
if err != nil {
115115
if errors.Is(err, peers.ErrNoPeers) {
116-
log.Trace("No peers available for beacon blocks by range request", "err", err, "peer", peerId, "slot", reqSlot, "reqCount", reqCount)
116+
log.Debug("[Caplin] no peers available for beacon blocks by range request", "slot", reqSlot, "reqCount", reqCount)
117117
} else {
118118
log.Debug("Failed to send beacon blocks by range request", "err", err, "peer", peerId, "slot", reqSlot, "reqCount", reqCount)
119119
}

cl/phase1/stages/chain_tip_sync.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package stages
22

33
import (
44
"context"
5+
"errors"
56
"sort"
67
"time"
78

@@ -57,9 +58,23 @@ func waitForExecutionEngineToBeFinished(ctx context.Context, cfg *Cfg) (ready bo
5758
// It sends a request to fetch the blocks, verifies the associated blobs, and inserts them into the blob store.
5859
// It returns a PeeredObject containing the blocks and the peer ID, or an error if something goes wrong.
5960
func fetchBlocksFromReqResp(ctx context.Context, cfg *Cfg, from uint64, count uint64) (*peers.PeeredObject[[]*cltypes.SignedBeaconBlock], error) {
60-
// spam requests to fetch blocks by range from the execution client
6161
blocks, pid, err := cfg.rpc.SendBeaconBlocksByRangeReq(ctx, from, count)
6262
for err != nil {
63+
// Respect context cancellation to avoid infinite loops.
64+
select {
65+
case <-ctx.Done():
66+
return nil, ctx.Err()
67+
default:
68+
}
69+
if errors.Is(err, peers.ErrNoPeers) {
70+
// Back off when no peers are available to avoid CPU-burning tight loops.
71+
log.Debug("[Caplin] no peers available, backing off before retrying block request", "from", from, "count", count)
72+
select {
73+
case <-time.After(2 * time.Second):
74+
case <-ctx.Done():
75+
return nil, ctx.Err()
76+
}
77+
}
6378
blocks, pid, err = cfg.rpc.SendBeaconBlocksByRangeReq(ctx, from, count)
6479
}
6580

cl/phase1/stages/clstages.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,21 @@ const (
142142
ForkChoice StageName = "ForkChoice"
143143
CleanupAndPruning StageName = "CleanupAndPruning"
144144
SleepForSlot StageName = "SleepForSlot"
145+
WaitForPeers StageName = "WaitForPeers"
145146
DownloadHistoricalBlocks StageName = "DownloadHistoricalBlocks"
146147
)
147148

148149
func MetaCatchingUp(args Args) StageName {
149150
if !args.hasDownloaded {
150151
return DownloadHistoricalBlocks
151152
}
153+
// If we have no peers, sleep until the next slot rather than entering sync
154+
// stages that will fail. This avoids CPU-burning retry loops when peer
155+
// discovery has not completed yet (common on Gnosis with 5-second slots).
156+
if args.peers == 0 {
157+
log.Debug("[Caplin] no peers available, waiting for peer discovery before syncing")
158+
return WaitForPeers
159+
}
152160
if args.seenEpoch < args.targetEpoch {
153161
return ForwardSync
154162
}
@@ -314,6 +322,24 @@ func ConsensusClStages(ctx context.Context,
314322
},
315323
ActionFunc: cleanupAndPruning,
316324
},
325+
WaitForPeers: {
326+
Description: `brief wait for peer discovery when no peers are available`,
327+
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
328+
if x := MetaCatchingUp(args); x != "" {
329+
return x
330+
}
331+
return ChainTipSync
332+
},
333+
ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error {
334+
// Wait 1 second before re-checking peers. Short enough to react
335+
// quickly once peers appear, long enough to avoid busy-looping.
336+
select {
337+
case <-time.After(1 * time.Second):
338+
case <-ctx.Done():
339+
}
340+
return nil
341+
},
342+
},
317343
SleepForSlot: {
318344
Description: `sleep until the next slot`,
319345
TransitionFunc: func(cfg *Cfg, args Args, err error) string {

0 commit comments

Comments
 (0)