Skip to content

Commit ef59637

Browse files
vgonkivsrenaynay
andauthored
bugfix(libs/header): store incoming head in syncer (#1724)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. --> ## Overview Closes #1728 <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. --> ## Checklist <!-- Please complete the checklist to ensure that the PR is ready to be reviewed. IMPORTANT: PRs should be left in Draft until the below checklist is completed. --> - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [x] Required CI checks are passing - [x] Visual proof for any user facing features like CLI or documentation updates - [x] Linked issues closed with keywords --------- Co-authored-by: rene <[email protected]>
1 parent 849ef45 commit ef59637

File tree

3 files changed

+27
-12
lines changed

3 files changed

+27
-12
lines changed

Diff for: libs/header/sync/sync.go

+17-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
logging "github.com/ipfs/go-log/v2"
@@ -42,7 +43,7 @@ type Syncer[H header.Header] struct {
4243
// signals to start syncing
4344
triggerSync chan struct{}
4445
// syncedHead is the latest synced header.
45-
syncedHead H
46+
syncedHead atomic.Pointer[H]
4647
// pending keeps ranges of valid new network headers awaiting to be appended to store
4748
pending ranges[H]
4849
// netReqLk ensures only one network head is requested at any moment
@@ -177,42 +178,48 @@ func (s *Syncer[H]) sync(ctx context.Context) {
177178
return
178179
}
179180

180-
if s.syncedHead.IsZero() {
181+
headPtr := s.syncedHead.Load()
182+
183+
var header H
184+
if headPtr == nil {
181185
head, err := s.store.Head(ctx)
182186
if err != nil {
183187
log.Errorw("getting head during sync", "err", err)
184188
return
185189
}
186-
s.syncedHead = head
190+
header = head
191+
} else {
192+
header = *headPtr
187193
}
188-
if s.syncedHead.Height() >= newHead.Height() {
194+
195+
if header.Height() >= newHead.Height() {
189196
log.Warnw("sync attempt to an already synced header",
190-
"synced_height", s.syncedHead.Height(),
197+
"synced_height", header.Height(),
191198
"attempted_height", newHead.Height(),
192199
)
193200
log.Warn("PLEASE REPORT THIS AS A BUG")
194201
return // should never happen, but just in case
195202
}
196203

197204
log.Infow("syncing headers",
198-
"from", s.syncedHead.Height(),
205+
"from", header.Height(),
199206
"to", newHead.Height())
200-
err := s.doSync(ctx, s.syncedHead, newHead)
207+
err := s.doSync(ctx, header, newHead)
201208
if err != nil {
202209
if errors.Is(err, context.Canceled) {
203210
// don't log this error as it is normal case of Syncer being stopped
204211
return
205212
}
206213

207214
log.Errorw("syncing headers",
208-
"from", s.syncedHead.Height(),
215+
"from", header.Height(),
209216
"to", newHead.Height(),
210217
"err", err)
211218
return
212219
}
213220

214221
log.Infow("finished syncing",
215-
"from", s.syncedHead.Height(),
222+
"from", header.Height(),
216223
"to", newHead.Height(),
217224
"elapsed time", s.state.End.Sub(s.state.Start))
218225
}
@@ -257,7 +264,7 @@ func (s *Syncer[H]) processHeaders(ctx context.Context, from, to uint64) (int, e
257264

258265
amount, err := s.store.Append(ctx, headers...)
259266
if err == nil && amount > 0 {
260-
s.syncedHead = headers[amount-1]
267+
s.syncedHead.Store(&headers[amount-1])
261268
}
262269
return amount, err
263270
}

Diff for: libs/header/sync/sync_head.go

+1
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func (s *Syncer[H]) incomingNetHead(ctx context.Context, netHead H) pubsub.Valid
112112
_, err := s.store.Append(ctx, netHead)
113113
if err == nil {
114114
// a happy case where we appended maybe head directly, so accept
115+
s.syncedHead.Store(&netHead)
115116
return pubsub.ValidationAccept
116117
}
117118
var nonAdj *header.ErrNonAdjacent

Diff for: libs/header/sync/sync_test.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ func TestSyncCatchUp(t *testing.T) {
9999
// 4. assert syncer caught-up
100100
have, err := localStore.Head(ctx)
101101
require.NoError(t, err)
102-
assert.Equal(t, syncer.syncedHead.Height(), incomingHead.Height())
102+
headerPtr := syncer.syncedHead.Load()
103+
require.NotNil(t, headerPtr)
104+
head = *headerPtr
105+
assert.Equal(t, head.Height(), incomingHead.Height())
103106
assert.Equal(t, exp.Height()+1, have.Height()) // plus one as we didn't add last header to remoteStore
104107
assert.Empty(t, syncer.pending.Head())
105108

@@ -157,7 +160,11 @@ func TestSyncPendingRangesWithMisses(t *testing.T) {
157160
require.NoError(t, err)
158161
_, err = localStore.GetByHeight(ctx, 43)
159162
require.NoError(t, err)
160-
require.Equal(t, syncer.syncedHead.Height(), int64(43))
163+
164+
headerPtr := syncer.syncedHead.Load()
165+
require.NotNil(t, headerPtr)
166+
lastHead := *headerPtr
167+
require.Equal(t, lastHead.Height(), int64(43))
161168
exp, err := remoteStore.Head(ctx)
162169
require.NoError(t, err)
163170

0 commit comments

Comments
 (0)