Skip to content

Commit 8604840

Browse files
committed
Merge branch 'ryan/fix-init-db-states' into 'main'
Simplify calculation of starting index within indexer See merge request flarenetwork/FSP/flare-system-c-chain-indexer!74
2 parents 3bbc551 + 2040771 commit 8604840

File tree

6 files changed

+49
-71
lines changed

6 files changed

+49
-71
lines changed

benchmarks/songbird_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func BenchmarkBlockRequests(b *testing.B) {
5353
logger.Fatal("Indexer create error: %s", err)
5454
}
5555

56-
err = cIndexer.IndexHistory(ctx)
56+
_, err = cIndexer.IndexHistory(ctx)
5757
if err != nil {
5858
logger.Fatal("History run error: %s", err)
5959
}

database/states.go

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package database
22

33
import (
44
"context"
5-
"flare-ftso-indexer/logger"
65
"sync"
76
"time"
87

@@ -64,7 +63,13 @@ func (s *DBStates) updateStates(newStates map[string]*State) {
6463

6564
func (s *DBStates) updateIndex(name string, newIndex, blockTimestamp uint64) {
6665
s.mu.Lock()
67-
s.States[name].updateIndex(newIndex, blockTimestamp)
66+
state := s.States[name]
67+
if state == nil {
68+
state = &State{Name: name}
69+
s.States[name] = state
70+
}
71+
72+
state.updateIndex(newIndex, blockTimestamp)
6873
s.mu.Unlock()
6974
}
7075

@@ -76,50 +81,32 @@ func (s *DBStates) updateDB(db *gorm.DB, name string) error {
7681
}
7782

7883
func (s *DBStates) Update(db *gorm.DB, name string, newIndex, blockTimestamp uint64) error {
79-
s.mu.RLock()
80-
state := s.States[name]
81-
s.mu.RUnlock()
82-
83-
if state == nil {
84-
return errors.Errorf("state %s not found", name)
85-
}
86-
8784
s.updateIndex(name, newIndex, blockTimestamp)
8885
return s.updateDB(db, name)
8986
}
9087

9188
func (s *DBStates) UpdateAtStart(
92-
db *gorm.DB, startIndex, startBlockTimestamp, lastChainIndex, lastBlockTimestamp, stopIndex uint64,
93-
) (uint64, uint64, error) {
94-
var err error
95-
89+
db *gorm.DB, startIndex, startBlockTimestamp, lastChainIndex, lastBlockTimestamp uint64,
90+
) error {
9691
s.mu.RLock()
97-
firstIndex := s.States[FirstDatabaseIndexState].Index
98-
lastIndex := s.States[LastDatabaseIndexState].Index
92+
_, firstDatabaseIndexSet := s.States[FirstDatabaseIndexState]
9993
s.mu.RUnlock()
10094

101-
if startIndex >= firstIndex && startIndex <= lastIndex {
102-
logger.Info("Data from blocks %d to %d already in the database", startIndex, lastIndex)
103-
startIndex = lastIndex + 1
104-
} else {
105-
logger.Warn("Data from blocks %d to %d not in the database, starting from %d", startIndex, lastIndex, firstIndex)
106-
107-
// if startIndex is set before existing data in the DB or a break among saved blocks
108-
// in the DB is created, then we change the guaranties about the starting block
109-
err = s.Update(db, FirstDatabaseIndexState, startIndex, startBlockTimestamp)
95+
// Set the first database index state only if it does not exist yet
96+
if !firstDatabaseIndexSet {
97+
err := s.Update(db, FirstDatabaseIndexState, startIndex, startBlockTimestamp)
11098
if err != nil {
111-
return 0, 0, errors.Wrap(err, "states.Update")
99+
return errors.Wrap(err, "states.Update(FirstDatabaseIndexState)")
112100
}
113101
}
114102

115-
err = s.Update(db, LastChainIndexState, lastChainIndex, lastBlockTimestamp)
103+
// Set the state for the current latest chain index
104+
err := s.Update(db, LastChainIndexState, lastChainIndex, lastBlockTimestamp)
116105
if err != nil {
117-
return 0, 0, errors.Wrap(err, "states.Update")
106+
return errors.Wrap(err, "states.Update(LastChainIndexState)")
118107
}
119108

120-
lastIndex = min(stopIndex, lastChainIndex)
121-
122-
return startIndex, lastIndex, nil
109+
return nil
123110
}
124111

125112
func UpdateDBStates(ctx context.Context, db *gorm.DB) (*DBStates, error) {
@@ -148,12 +135,7 @@ func getDBStates(ctx context.Context, db *gorm.DB) (map[string]*State, error) {
148135
return errors.Wrap(err, "db.Where")
149136
}
150137

151-
// If the state is not found, create a new one.
152-
state = &State{Name: name, Updated: time.Now()}
153-
err := db.Create(state).Error
154-
if err != nil {
155-
return errors.Wrap(err, "db.Create")
156-
}
138+
return nil
157139
}
158140

159141
mu.Lock()

indexer/indexer.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -143,35 +143,35 @@ func parseTransactionAddress(address string) common.Address {
143143
return common.HexToAddress(address)
144144
}
145145

146-
func (ci *BlockIndexer) IndexHistory(ctx context.Context) error {
146+
func (ci *BlockIndexer) IndexHistory(ctx context.Context) (uint64, error) {
147147
states, err := database.UpdateDBStates(ctx, ci.db)
148148
if err != nil {
149-
return errors.Wrap(err, "database.UpdateDBStates")
149+
return 0, errors.Wrap(err, "database.UpdateDBStates")
150150
}
151151

152-
ixRange, err := ci.getIndexRange(ctx, states)
152+
ixRange, err := ci.getIndexRange(ctx, states, ci.params.StartIndex)
153153
if err != nil {
154-
return err
154+
return 0, err
155155
}
156156

157157
logger.Info("Starting to index blocks from %d to %d", ixRange.start, ixRange.end)
158158

159159
for i := ixRange.start; i <= ixRange.end; i = i + ci.params.BatchSize {
160160
if err := ci.indexBatch(ctx, states, i, ixRange); err != nil {
161-
return err
161+
return 0, err
162162
}
163163

164164
// in the second to last run of the loop update lastIndex to get the blocks
165165
// that were produced during the run of the algorithm
166166
if ci.shouldUpdateLastIndex(ixRange, i) {
167167
ixRange, err = ci.updateLastIndexHistory(ctx, states, ixRange)
168168
if err != nil {
169-
return err
169+
return 0, err
170170
}
171171
}
172172
}
173173

174-
return nil
174+
return ixRange.end, nil
175175
}
176176

177177
func (ci *BlockIndexer) indexBatch(
@@ -340,24 +340,27 @@ type indexRange struct {
340340
}
341341

342342
func (ci *BlockIndexer) getIndexRange(
343-
ctx context.Context, states *database.DBStates,
343+
ctx context.Context, states *database.DBStates, startIndex uint64,
344344
) (*indexRange, error) {
345345
lastChainIndex, lastChainTimestamp, err := ci.fetchLastBlockIndex(ctx)
346346
if err != nil {
347347
return nil, errors.Wrap(err, "ci.fetchLastBlockIndex")
348348
}
349349

350-
startTimestamp, err := ci.fetchBlockTimestamp(ctx, ci.params.StartIndex)
350+
startTimestamp, err := ci.fetchBlockTimestamp(ctx, startIndex)
351351
if err != nil {
352352
return nil, errors.Wrap(err, "ci.fetchBlockTimestamp")
353353
}
354354

355-
startIndex, lastIndex, err := states.UpdateAtStart(ci.db, ci.params.StartIndex,
356-
startTimestamp, lastChainIndex, lastChainTimestamp, ci.params.StopIndex)
355+
err = states.UpdateAtStart(
356+
ci.db, startIndex, startTimestamp, lastChainIndex, lastChainTimestamp,
357+
)
357358
if err != nil {
358359
return nil, errors.Wrap(err, "states.UpdateAtStart")
359360
}
360361

362+
lastIndex := min(lastChainIndex, ci.params.StopIndex)
363+
361364
return &indexRange{start: startIndex, end: lastIndex}, nil
362365
}
363366

@@ -448,13 +451,13 @@ func (ci *BlockIndexer) updateLastIndexHistory(
448451
return ixRange, nil
449452
}
450453

451-
func (ci *BlockIndexer) IndexContinuous(ctx context.Context) error {
454+
func (ci *BlockIndexer) IndexContinuous(ctx context.Context, startIndex uint64) error {
452455
states, err := database.UpdateDBStates(ctx, ci.db)
453456
if err != nil {
454457
return errors.Wrap(err, "database.UpdateDBStates")
455458
}
456459

457-
ixRange, err := ci.getIndexRange(ctx, states)
460+
ixRange, err := ci.getIndexRange(ctx, states, startIndex)
458461
if err != nil {
459462
return errors.Wrap(err, "ci.getIndexRange")
460463
}
@@ -493,7 +496,7 @@ func (ci *BlockIndexer) IndexContinuous(ctx context.Context) error {
493496
blockNum++
494497
}
495498

496-
logger.Debug("Stopping the indexer at block %d", states.States[database.LastDatabaseIndexState].Index)
499+
logger.Debug("Stopping the indexer at block %d", blockNum)
497500

498501
return nil
499502
}

indexer/indexer_test.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,6 @@ func runIndexer(ctx context.Context, mockChain *indexer_testing.MockChain, db *g
165165

166166
time.Sleep(3 * time.Second)
167167

168-
// set a new starting index based on the history drop interval
169-
historyDropIntervalSeconds := uint64(10000)
170-
171168
nodeURL, err := cfg.Chain.FullNodeURL()
172169
if err != nil {
173170
logger.Fatal("Invalid node URL in config: %s", err)
@@ -185,21 +182,16 @@ func runIndexer(ctx context.Context, mockChain *indexer_testing.MockChain, db *g
185182
}
186183

187184
// index history with parallel processing
188-
err = cIndexer.IndexHistory(ctx)
185+
historyLastIndex, err := cIndexer.IndexHistory(ctx)
189186
if err != nil {
190187
logger.Fatal("History run error: %s", err)
191188
}
192189

193-
// turn on the function to delete in the database everything that
194-
// is older than the historyDrop interval
195-
go database.DropHistory(
196-
ctx, db, historyDropIntervalSeconds, database.HistoryDropIntervalCheck, ethClient, 0,
197-
)
198-
199-
// run indexer
200-
err = cIndexer.IndexContinuous(ctx)
201-
if err != nil {
202-
logger.Fatal("Continuous run error: %s", err)
190+
if historyLastIndex != cfg.Indexer.StopIndex {
191+
logger.Fatal(
192+
"History indexing did not reach the stop index: last %d, stop %d",
193+
historyLastIndex, cfg.Indexer.StopIndex,
194+
)
203195
}
204196

205197
return nil

main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,9 @@ func runIndexer(
155155
return err
156156
}
157157

158-
err = boff.RetryNoReturn(
158+
historyLastIndex, err := boff.Retry(
159159
ctx,
160-
func() error {
160+
func() (uint64, error) {
161161
return cIndexer.IndexHistory(ctx)
162162
},
163163
"IndexHistory",
@@ -180,7 +180,7 @@ func runIndexer(
180180
err = boff.RetryNoReturn(
181181
ctx,
182182
func() error {
183-
return cIndexer.IndexContinuous(ctx)
183+
return cIndexer.IndexContinuous(ctx, historyLastIndex+1)
184184
},
185185
"IndexContinuous",
186186
)

main_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,14 @@ func TestIntegrationIndexHistory(t *testing.T) {
7474
}
7575

7676
func (suite *IntegrationIndexContinuousSuite) SetupSuite() {
77-
err := suite.indexer.IndexContinuous(suite.ctx)
77+
err := suite.indexer.IndexContinuous(suite.ctx, startBlock)
7878
require.NoError(suite.T(), err, "Could not run the indexer")
7979
}
8080

8181
func (suite *IntegrationIndexHistorySuite) SetupSuite() {
82-
err := suite.indexer.IndexHistory(suite.ctx)
82+
lastIndex, err := suite.indexer.IndexHistory(suite.ctx)
8383
require.NoError(suite.T(), err, "Could not run the indexer")
84+
require.Equal(suite.T(), uint64(endBlockHistory), lastIndex, "Last indexed block does not match expected value")
8485
}
8586

8687
func (suite *IntegrationIndex) TestCheckBlocks() {

0 commit comments

Comments
 (0)