Skip to content

Commit e960399

Browse files
committed
Merge branch 'block-confirmations' into 'main'
Set configurable number of block confirmations See merge request flarenetwork/flare-system-c-chain-indexer!48
2 parents f664235 + 473d49a commit e960399

File tree

9 files changed

+353
-153
lines changed

9 files changed

+353
-153
lines changed

config/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ var (
1818
CfgFlag = flag.String("config", "config.toml", "Configuration file (toml format)")
1919
)
2020

21+
const defaultConfirmations = 1
22+
2123
func init() {
2224
GlobalConfigCallback.AddCallback(func(config GlobalConfig) {
2325
tCfg := config.TimeoutConfig()
@@ -77,6 +79,7 @@ type IndexerConfig struct {
7779
NewBlockCheckMillis int `toml:"new_block_check_millis"`
7880
CollectTransactions []TransactionInfo `toml:"collect_transactions"`
7981
CollectLogs []LogInfo `toml:"collect_logs"`
82+
Confirmations uint64 `toml:"confirmations"`
8083
}
8184

8285
type TimeoutConfig struct {
@@ -99,7 +102,7 @@ type LogInfo struct {
99102
func BuildConfig() (*Config, error) {
100103
cfgFileName := *CfgFlag
101104

102-
cfg := new(Config)
105+
cfg := &Config{Indexer: IndexerConfig{Confirmations: defaultConfirmations}}
103106
err := parseConfigFile(cfg, cfgFileName)
104107
if err != nil {
105108
return nil, err

indexer/.snapshots/TestIndexer

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
(*database.DBStates)({
2+
States: (map[string]*database.State) (len=3) {
3+
(string) (len=20) "first_database_block": (*database.State)({
4+
BaseEntity: (database.BaseEntity) {
5+
ID: (uint64) 1
6+
},
7+
Name: (string) (len=20) "first_database_block",
8+
Index: (uint64) 1112,
9+
BlockTimestamp: (uint64) 1662557489,
10+
Updated: (time.Time) 0001-01-01 00:00:00 +0000 UTC
11+
}),
12+
(string) (len=16) "last_chain_block": (*database.State)({
13+
BaseEntity: (database.BaseEntity) {
14+
ID: (uint64) 3
15+
},
16+
Name: (string) (len=16) "last_chain_block",
17+
Index: (uint64) 10223878,
18+
BlockTimestamp: (uint64) 1721830576,
19+
Updated: (time.Time) 0001-01-01 00:00:00 +0000 UTC
20+
}),
21+
(string) (len=19) "last_database_block": (*database.State)({
22+
BaseEntity: (database.BaseEntity) {
23+
ID: (uint64) 2
24+
},
25+
Name: (string) (len=19) "last_database_block",
26+
Index: (uint64) 2400,
27+
BlockTimestamp: (uint64) 1662560050,
28+
Updated: (time.Time) 0001-01-01 00:00:00 +0000 UTC
29+
})
30+
},
31+
mu: (sync.RWMutex) {
32+
w: (sync.Mutex) {
33+
state: (int32) 0,
34+
sema: (uint32) 0
35+
},
36+
writerSem: (uint32) 0,
37+
readerSem: (uint32) 0,
38+
readerCount: (atomic.Int32) {
39+
_: (atomic.noCopy) {
40+
},
41+
v: (int32) 0
42+
},
43+
readerWait: (atomic.Int32) {
44+
_: (atomic.noCopy) {
45+
},
46+
v: (int32) 0
47+
}
48+
}
49+
})

indexer/blocks.go

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"flare-ftso-indexer/config"
66
"flare-ftso-indexer/database"
77
"flare-ftso-indexer/logger"
8-
"fmt"
98
"math/big"
109
"sync"
1110
"time"
@@ -26,10 +25,7 @@ func newBlockBatch(batchSize uint64) *blockBatch {
2625
}
2726

2827
func (ci *BlockIndexer) fetchBlock(ctx context.Context, index *uint64) (block *types.Block, err error) {
29-
var indexBigInt *big.Int
30-
if index != nil {
31-
indexBigInt = new(big.Int).SetUint64(*index)
32-
}
28+
indexBigInt := indexToBigInt(index)
3329

3430
bOff := backoff.NewExponentialBackOff()
3531
bOff.MaxElapsedTime = config.BackoffMaxElapsedTime
@@ -55,22 +51,63 @@ func (ci *BlockIndexer) fetchBlock(ctx context.Context, index *uint64) (block *t
5551
return block, nil
5652
}
5753

54+
func (ci *BlockIndexer) fetchBlockHeader(ctx context.Context, index *uint64) (header *types.Header, err error) {
55+
indexBigInt := indexToBigInt(index)
56+
57+
bOff := backoff.NewExponentialBackOff()
58+
bOff.MaxElapsedTime = config.BackoffMaxElapsedTime
59+
60+
err = backoff.RetryNotify(
61+
func() error {
62+
ctx, cancelFunc := context.WithTimeout(ctx, config.Timeout)
63+
defer cancelFunc()
64+
65+
header, err = ci.client.HeaderByNumber(ctx, indexBigInt)
66+
return err
67+
},
68+
bOff,
69+
func(err error, d time.Duration) {
70+
logger.Debug("HeaderByNumber error: %s. Will retry after %s", err, d)
71+
},
72+
)
73+
74+
if err != nil {
75+
return nil, errors.Wrap(err, "ci.client.HeaderByNumber")
76+
}
77+
78+
return header, nil
79+
}
80+
81+
func indexToBigInt(index *uint64) *big.Int {
82+
if index == nil {
83+
return nil
84+
}
85+
86+
return new(big.Int).SetUint64(*index)
87+
}
88+
5889
func (ci *BlockIndexer) fetchLastBlockIndex(ctx context.Context) (uint64, uint64, error) {
59-
lastBlock, err := ci.fetchBlock(ctx, nil)
90+
lastBlock, err := ci.fetchBlockHeader(ctx, nil)
91+
if err != nil {
92+
return 0, 0, errors.Wrap(err, "fetchBlockHeader last")
93+
}
94+
95+
latestConfirmedNumber := lastBlock.Number.Uint64() - ci.params.Confirmations
96+
latestConfirmedHeader, err := ci.fetchBlockHeader(ctx, &latestConfirmedNumber)
6097
if err != nil {
61-
return 0, 0, fmt.Errorf("fetchLastBlockIndex: %w", err)
98+
return 0, 0, errors.Wrap(err, "fetchBlockHeader latestConfirmed")
6299
}
63100

64-
return lastBlock.NumberU64(), lastBlock.Time(), nil
101+
return latestConfirmedNumber, latestConfirmedHeader.Time, nil
65102
}
66103

67104
func (ci *BlockIndexer) fetchBlockTimestamp(ctx context.Context, index uint64) (uint64, error) {
68-
lastBlock, err := ci.fetchBlock(ctx, &index)
105+
lastBlock, err := ci.fetchBlockHeader(ctx, &index)
69106
if err != nil {
70-
return 0, fmt.Errorf("fetchBlockTimestamp: %w", err)
107+
return 0, errors.Wrap(err, "fetchBlockHeader")
71108
}
72109

73-
return lastBlock.Time(), nil
110+
return lastBlock.Time, nil
74111
}
75112

76113
func (ci *BlockIndexer) processBlocks(

indexer/indexer_test.go

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,21 @@ import (
1111
"time"
1212

1313
"github.com/ava-labs/coreth/ethclient"
14+
"github.com/bradleyjkemp/cupaloy/v2"
1415
"github.com/caarlos0/env/v10"
15-
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
"gorm.io/gorm"
1618
)
1719

1820
type testConfig struct {
19-
DBHost string `env:"DB_HOST" envDefault:"localhost"`
20-
DBPort int `env:"DB_PORT" envDefault:"3306"`
21-
DBName string `env:"DB_NAME" envDefault:"flare_ftso_indexer_test"`
22-
DBUsername string `env:"DB_USERNAME" envDefault:"root"`
23-
DBPassword string `env:"DB_PASSWORD" envDefault:"root"`
24-
MockChainPort int `env:"MOCK_CHAIN_PORT" envDefault:"5500"`
21+
DBHost string `env:"DB_HOST" envDefault:"localhost"`
22+
DBPort int `env:"DB_PORT" envDefault:"3306"`
23+
DBName string `env:"DB_NAME" envDefault:"flare_ftso_indexer_test"`
24+
DBUsername string `env:"DB_USERNAME" envDefault:"root"`
25+
DBPassword string `env:"DB_PASSWORD" envDefault:"root"`
26+
MockChainPort int `env:"MOCK_CHAIN_PORT" envDefault:"5500"`
27+
RecorderNodeURL string `env:"RECORDER_NODE_URL"`
28+
ResponsesFile string `env:"RESPONSES_FILE" envDefault:"../testing/chain_copy/responses.json"`
2529
}
2630

2731
func TestIndexer(t *testing.T) {
@@ -33,21 +37,6 @@ func TestIndexer(t *testing.T) {
3337
t.Fatal("Config parse error:", err)
3438
}
3539

36-
// mock blockchain
37-
go func() {
38-
err := indexer_testing.MockChain(
39-
tCfg.MockChainPort,
40-
"../testing/chain_copy/blocks.json",
41-
"../testing/chain_copy/transactions.json",
42-
)
43-
if err != nil {
44-
logger.Fatal("Mock chain error: %s", err)
45-
}
46-
}()
47-
48-
time.Sleep(3 * time.Second)
49-
indexer_testing.ChainLastBlock = 2000
50-
5140
// set configuration parameters
5241
mockChainAddress := fmt.Sprintf("http://localhost:%d", tCfg.MockChainPort)
5342
cfgChain := config.ChainConfig{NodeURL: mockChainAddress}
@@ -98,12 +87,51 @@ func TestIndexer(t *testing.T) {
9887
cfg := config.Config{Indexer: cfgIndexer, Chain: cfgChain, Logger: cfgLog, DB: cfgDB}
9988
config.GlobalConfigCallback.Call(cfg)
10089

90+
// mock blockchain
91+
mockChain, err := indexer_testing.NewMockChain(
92+
tCfg.MockChainPort,
93+
tCfg.ResponsesFile,
94+
tCfg.RecorderNodeURL,
95+
)
96+
require.NoError(t, err)
97+
10198
// connect to the database
102-
db, err := database.ConnectAndInitialize(ctx, &cfgDB)
99+
db, err := database.ConnectAndInitialize(ctx, &cfg.DB)
103100
if err != nil {
104101
logger.Fatal("Database connect and initialize error: %s", err)
105102
}
106103

104+
err = runIndexer(ctx, mockChain, db, &cfg)
105+
require.NoError(t, err)
106+
107+
// correctness check
108+
states, err := database.UpdateDBStates(ctx, db)
109+
require.NoError(t, err)
110+
111+
// Set the update timestamps to zero for the snapshot as these will
112+
// vary with current system time.
113+
for _, state := range states.States {
114+
state.Updated = time.Time{}
115+
}
116+
117+
cupaloy.SnapshotT(t, states)
118+
}
119+
120+
func runIndexer(ctx context.Context, mockChain *indexer_testing.MockChain, db *gorm.DB, cfg *config.Config) error {
121+
go func() {
122+
if err := mockChain.Run(ctx); err != nil {
123+
logger.Fatal("Mock chain error: %s", err)
124+
}
125+
}()
126+
127+
defer func() {
128+
if err := mockChain.Stop(); err != nil {
129+
logger.Error("Mock chain stop error: %s", err)
130+
}
131+
}()
132+
133+
time.Sleep(3 * time.Second)
134+
107135
// set a new starting index based on the history drop interval
108136
historyDropIntervalSeconds := uint64(10000)
109137

@@ -113,7 +141,7 @@ func TestIndexer(t *testing.T) {
113141
}
114142

115143
// create the indexer
116-
cIndexer, err := CreateBlockIndexer(&cfg, db, ethClient)
144+
cIndexer, err := CreateBlockIndexer(cfg, db, ethClient)
117145
if err != nil {
118146
logger.Fatal("Create indexer error: %s", err)
119147
}
@@ -124,9 +152,6 @@ func TestIndexer(t *testing.T) {
124152
logger.Fatal("History run error: %s", err)
125153
}
126154

127-
// at the mock server add new blocks after some time
128-
go increaseLastBlockAndStop()
129-
130155
// turn on the function to delete in the database everything that
131156
// is older than the historyDrop interval
132157
go database.DropHistory(
@@ -139,19 +164,5 @@ func TestIndexer(t *testing.T) {
139164
logger.Fatal("Continuous run error: %s", err)
140165
}
141166

142-
// correctness check
143-
states, err := database.UpdateDBStates(ctx, db)
144-
assert.NoError(t, err)
145-
assert.Equal(t, 1112, int(states.States[database.FirstDatabaseIndexState].Index))
146-
assert.Equal(t, 2400, int(states.States[database.LastDatabaseIndexState].Index))
147-
assert.Equal(t, 8980059, int(states.States[database.LastChainIndexState].Index))
148-
}
149-
150-
func increaseLastBlockAndStop() {
151-
indexer_testing.ChainLastBlock = 2100
152-
time.Sleep(time.Second)
153-
indexer_testing.ChainLastBlock = 2200
154-
time.Sleep(time.Second)
155-
indexer_testing.ChainLastBlock = 2499
156-
time.Sleep(10 * time.Second)
167+
return nil
157168
}

testing/chain_copy/blocks.json

Lines changed: 0 additions & 1 deletion
This file was deleted.

testing/chain_copy/responses.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

testing/chain_copy/transactions.json

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)