Skip to content

Commit 00d80c4

Browse files
Wondertanrenaynayliamsi
authored
feat|refactor(header/sync): network head determination (#990)
* feat(params|header/sync): dirty intergration of block time into Syncer Co-authored-by: Rene <[email protected]> * feat(header): new utility funcs for the ExtendedHeader in preparation for Syncer improvements Co-authored-by: Rene <[email protected]> * feat|refactor(header/sync): revision of Syncer's objective head deteremination logic The previous version of Syncer struggled with two issues: * On Syncer's start, synchronization didn't start and waited for a gossiped header trigger sync and set a sync target (only when the subjective head was not expired) * This is why Node could wait up to block time second to start syncing * There was no way to request the most recent objective header of the network * I.e. if the user wanted to request the latest possible state, it wasn't able to do that besides waiting for full sync to finish. The new reimplementation fixes these two problems and improves code readability and docs. Mainly, it splits the existing `trustedHead` into two methods `subjectiveHead` and `objectiveHead`. Where the latter now relies on the latest known header timestamp and block time to determine its recency. The new reimplementation fixes these two problems and improves code readability and docs. Mainly, it splits the existing trustedHead into two methods subjectiveHead and objectiveHead. Where the latter now relies on the latest known header timestamp and block time to determine its recency. If the header is not recent, we request it from the trusted peer(s), assuming it's always synced. * docs(header/sync): add TODO for potential optimization * refactor(header/sync): rework Syncer lifecycling Mainly, allow Start to error so that subsequent Stop does not panic. While also make lifecycle logic less confusing and less error-prone * fix(node): do not fail the Start for Syncer if it is not initialized, so that Node tests does not fail * fix(header/sync): use RWLock for sync ranges Going further, there wiil be multiple readers that should not block each other * fix(header/sync): ensure objective head is requested only once when many at any moment * chore(header/sync): cleanup syncing code and update the tests to use WaitSync * chore(header/sync): documentation, logging and code dispoisiton improvements * Terminology change from the 'objective head' to the 'network head' consistent over docs and logs * More logs for unhappy cases + more information for extisting logs * Extracttion of head retrieval logic into a separate file * Apply docs suggestions from @renaynay and @liamsi Co-authored-by: rene <[email protected]> Co-authored-by: Ismail Khoffi <[email protected]> * Update header/header.go Co-authored-by: rene <[email protected]> Co-authored-by: Rene <[email protected]> Co-authored-by: rene <[email protected]> Co-authored-by: Ismail Khoffi <[email protected]>
1 parent 4618e2b commit 00d80c4

File tree

9 files changed

+328
-144
lines changed

9 files changed

+328
-144
lines changed

Diff for: header/header.go

+10
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,16 @@ func (eh *ExtendedHeader) LastHeader() bts.HexBytes {
8282
return eh.RawHeader.LastBlockID.Hash
8383
}
8484

85+
// IsBefore returns whether the given header is of a higher height.
86+
func (eh *ExtendedHeader) IsBefore(h *ExtendedHeader) bool {
87+
return eh.Height < h.Height
88+
}
89+
90+
// Equals returns whether the hash and height of the given header match.
91+
func (eh *ExtendedHeader) Equals(header *ExtendedHeader) bool {
92+
return eh.Height == header.Height && bytes.Equal(eh.Hash(), header.Hash())
93+
}
94+
8595
// ValidateBasic performs *basic* validation to check for missed/incorrect fields.
8696
func (eh *ExtendedHeader) ValidateBasic() error {
8797
err := eh.RawHeader.ValidateBasic()

Diff for: header/sync/ranges.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ import (
99
// ranges keeps non-overlapping and non-adjacent header ranges which are used to cache headers (in ascending order).
1010
// This prevents unnecessary / duplicate network requests for additional headers during sync.
1111
type ranges struct {
12+
lk sync.RWMutex
1213
ranges []*headerRange
13-
lk sync.Mutex // no need for RWMutex as there is only one reader
1414
}
1515

1616
// Head returns the highest ExtendedHeader in all ranges if any.
1717
func (rs *ranges) Head() *header.ExtendedHeader {
18-
rs.lk.Lock()
19-
defer rs.lk.Unlock()
18+
rs.lk.RLock()
19+
defer rs.lk.RUnlock()
2020

2121
ln := len(rs.ranges)
2222
if ln == 0 {
@@ -97,9 +97,9 @@ func (rs *ranges) First() (*headerRange, bool) {
9797
}
9898

9999
type headerRange struct {
100-
start uint64
101-
lk sync.Mutex // no need for RWMutex as there is only one reader
100+
lk sync.RWMutex
102101
headers []*header.ExtendedHeader
102+
start uint64
103103
}
104104

105105
func newRange(h *header.ExtendedHeader) *headerRange {
@@ -118,15 +118,15 @@ func (r *headerRange) Append(h ...*header.ExtendedHeader) {
118118

119119
// Empty reports if range is empty.
120120
func (r *headerRange) Empty() bool {
121-
r.lk.Lock()
122-
defer r.lk.Unlock()
121+
r.lk.RLock()
122+
defer r.lk.RUnlock()
123123
return len(r.headers) == 0
124124
}
125125

126126
// Head reports the head of range if any.
127127
func (r *headerRange) Head() *header.ExtendedHeader {
128-
r.lk.Lock()
129-
defer r.lk.Unlock()
128+
r.lk.RLock()
129+
defer r.lk.RUnlock()
130130
ln := len(r.headers)
131131
if ln == 0 {
132132
return nil

Diff for: header/sync/sync.go

+48-125
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"time"
88

99
logging "github.com/ipfs/go-log/v2"
10-
11-
pubsub "github.com/libp2p/go-libp2p-pubsub"
1210
tmbytes "github.com/tendermint/tendermint/libs/bytes"
1311

1412
"github.com/celestiaorg/celestia-node/header"
@@ -17,56 +15,73 @@ import (
1715
var log = logging.Logger("header/sync")
1816

1917
// Syncer implements efficient synchronization for headers.
18+
//
19+
// Subjective header - the latest known header that is not expired (within trusting period)
20+
// Network header - the latest header received from the network
21+
2022
//
2123
// There are two main processes running in Syncer:
2224
// 1. Main syncing loop(s.syncLoop)
23-
// * Performs syncing from the subjective(local chain view) header up to the latest known trusted header
25+
// * Performs syncing from the subjective header up to the network head
2426
// * Syncs by requesting missing headers from Exchange or
25-
// * By accessing cache of pending and verified headers
26-
// 2. Receives new headers from PubSub subnetwork (s.processIncoming)
27-
// * Usually, a new header is adjacent to the trusted head and if so, it is simply appended to the local store,
28-
// incrementing the subjective height and making it the new latest known trusted header.
29-
// * Or, if it receives a header further in the future,
30-
// * verifies against the latest known trusted header
31-
// * adds the header to pending cache(making it the latest known trusted header)
27+
// * By accessing cache of pending network headers received from PubSub
28+
// 2. Receives new headers from PubSub subnetwork (s.incomingNetHead)
29+
// * Once received, tries to append it to the store
30+
// * Or, if not adjacent to head of the store,
31+
// * verifies against the latest known subjective header
32+
// * adds the header to pending cache, thereby making it the latest known subjective header
3233
// * and triggers syncing loop to catch up to that point.
3334
type Syncer struct {
3435
sub header.Subscriber
3536
exchange header.Exchange
3637
store header.Store
3738

39+
// blockTime provides a reference point for the Syncer to determine
40+
// whether its subjective head is outdated
41+
blockTime time.Duration
42+
3843
// stateLk protects state which represents the current or latest sync
3944
stateLk sync.RWMutex
4045
state State
4146
// signals to start syncing
4247
triggerSync chan struct{}
43-
// pending keeps ranges of valid headers received from the network awaiting to be appended to store
48+
// pending keeps ranges of valid new network headers awaiting to be appended to store
4449
pending ranges
45-
// cancel cancels syncLoop's context
50+
// netReqLk ensures only one network head is requested at any moment
51+
netReqLk sync.RWMutex
52+
53+
// controls lifecycle for syncLoop
54+
ctx context.Context
4655
cancel context.CancelFunc
4756
}
4857

4958
// NewSyncer creates a new instance of Syncer.
50-
func NewSyncer(exchange header.Exchange, store header.Store, sub header.Subscriber) *Syncer {
59+
func NewSyncer(exchange header.Exchange, store header.Store, sub header.Subscriber, blockTime time.Duration) *Syncer {
5160
return &Syncer{
5261
sub: sub,
5362
exchange: exchange,
5463
store: store,
64+
blockTime: blockTime,
5565
triggerSync: make(chan struct{}, 1), // should be buffered
5666
}
5767
}
5868

5969
// Start starts the syncing routine.
60-
func (s *Syncer) Start(context.Context) error {
61-
err := s.sub.AddValidator(s.processIncoming)
70+
func (s *Syncer) Start(ctx context.Context) error {
71+
s.ctx, s.cancel = context.WithCancel(context.Background())
72+
// register validator for header subscriptions
73+
// syncer does not subscribe itself and syncs headers together with validation
74+
err := s.sub.AddValidator(s.incomingNetHead)
6275
if err != nil {
6376
return err
6477
}
65-
66-
ctx, cancel := context.WithCancel(context.Background())
67-
go s.syncLoop(ctx)
68-
s.wantSync()
69-
s.cancel = cancel
78+
// get the latest head and set it as syncing target
79+
_, err = s.networkHead(ctx)
80+
if err != nil {
81+
return err
82+
}
83+
// start syncLoop only if Start is errorless
84+
go s.syncLoop()
7085
return nil
7186
}
7287

@@ -119,35 +134,6 @@ func (s *Syncer) State() State {
119134
return state
120135
}
121136

122-
// trustedHead returns the latest known trusted header that is within the trusting period.
123-
func (s *Syncer) trustedHead(ctx context.Context) (*header.ExtendedHeader, error) {
124-
// check pending for trusted header and return it if applicable
125-
// NOTE: Pending cannot be expired, guaranteed
126-
pendHead := s.pending.Head()
127-
if pendHead != nil {
128-
return pendHead, nil
129-
}
130-
131-
sbj, err := s.store.Head(ctx)
132-
if err != nil {
133-
return nil, err
134-
}
135-
136-
// check if our subjective header is not expired and use it
137-
if !sbj.IsExpired() {
138-
return sbj, nil
139-
}
140-
141-
// otherwise, request head from a trustedPeer or, in other words, do automatic subjective initialization
142-
objHead, err := s.exchange.Head(ctx)
143-
if err != nil {
144-
return nil, err
145-
}
146-
147-
s.pending.Add(objHead)
148-
return objHead, nil
149-
}
150-
151137
// wantSync will trigger the syncing loop (non-blocking).
152138
func (s *Syncer) wantSync() {
153139
select {
@@ -157,100 +143,37 @@ func (s *Syncer) wantSync() {
157143
}
158144

159145
// syncLoop controls syncing process.
160-
func (s *Syncer) syncLoop(ctx context.Context) {
146+
func (s *Syncer) syncLoop() {
161147
for {
162148
select {
163149
case <-s.triggerSync:
164-
s.sync(ctx)
165-
case <-ctx.Done():
150+
s.sync(s.ctx)
151+
case <-s.ctx.Done():
166152
return
167153
}
168154
}
169155
}
170156

171-
// sync ensures we are synced up to any trusted header.
157+
// sync ensures we are synced from the Store's head up to the new subjective head
172158
func (s *Syncer) sync(ctx context.Context) {
173-
trstHead, err := s.trustedHead(ctx)
174-
if err != nil {
175-
log.Errorw("getting trusted head", "err", err)
159+
newHead := s.pending.Head()
160+
if newHead == nil {
176161
return
177162
}
178163

179-
s.syncTo(ctx, trstHead)
180-
}
181-
182-
// processIncoming processes new processIncoming Headers, validates them and stores/caches if applicable.
183-
func (s *Syncer) processIncoming(ctx context.Context, maybeHead *header.ExtendedHeader) pubsub.ValidationResult {
184-
// 1. Try to append. If header is not adjacent/from future - try it for pending cache below
185-
_, err := s.store.Append(ctx, maybeHead)
186-
switch err {
187-
case nil:
188-
// a happy case where we append adjacent header correctly
189-
return pubsub.ValidationAccept
190-
case header.ErrNonAdjacent:
191-
// not adjacent, so try to cache it after verifying
192-
default:
193-
var verErr *header.VerifyError
194-
if errors.As(err, &verErr) {
195-
return pubsub.ValidationReject
196-
}
197-
198-
log.Errorw("appending header",
199-
"height", maybeHead.Height,
200-
"hash", maybeHead.Hash().String(),
201-
"err", err)
202-
// might be a storage error or something else, but we can still try to continue processing 'maybeHead'
203-
}
204-
205-
// 2. Get known trusted head, so we can verify maybeHead
206-
trstHead, err := s.trustedHead(ctx)
207-
if err != nil {
208-
log.Errorw("getting trusted head", "err", err)
209-
return pubsub.ValidationIgnore // we don't know if header is invalid so ignore
210-
}
211-
212-
// 3. Filter out maybeHead if behind trusted
213-
if maybeHead.Height <= trstHead.Height {
214-
log.Warnw("received known header",
215-
"height", maybeHead.Height,
216-
"hash", maybeHead.Hash())
217-
return pubsub.ValidationIgnore // we don't know if header is invalid so ignore
218-
}
219-
220-
// 4. Verify maybeHead against trusted
221-
err = trstHead.VerifyNonAdjacent(maybeHead)
222-
var verErr *header.VerifyError
223-
if errors.As(err, &verErr) {
224-
log.Errorw("invalid header",
225-
"height_of_invalid", maybeHead.Height,
226-
"hash_of_invalid", maybeHead.Hash(),
227-
"height_of_trusted", trstHead.Height,
228-
"hash_of_trusted", trstHead.Hash(),
229-
"reason", verErr.Reason)
230-
return pubsub.ValidationReject
231-
}
232-
233-
// 5. Save verified header to pending cache
234-
// NOTE: Pending cache can't be DOSed as we verify above each header against a trusted one.
235-
s.pending.Add(maybeHead)
236-
// and trigger sync to catch-up
237-
s.wantSync()
238-
log.Infow("pending head",
239-
"height", maybeHead.Height,
240-
"hash", maybeHead.Hash())
241-
return pubsub.ValidationAccept
242-
}
243-
244-
// syncTo requests headers from locally stored head up to the new head.
245-
func (s *Syncer) syncTo(ctx context.Context, newHead *header.ExtendedHeader) {
246164
head, err := s.store.Head(ctx)
247165
if err != nil {
248166
log.Errorw("getting head during sync", "err", err)
249167
return
250168
}
251169

252-
if head.Height == newHead.Height {
253-
return
170+
if head.Height >= newHead.Height {
171+
log.Warnw("sync attempt to an already synced header",
172+
"synced_height", head.Height,
173+
"attempted_height", newHead.Height,
174+
)
175+
log.Warn("PLEASE REPORT THIS AS A BUG")
176+
return // should never happen, but just in case
254177
}
255178

256179
log.Infow("syncing headers",

0 commit comments

Comments
 (0)