Skip to content

Commit d4857ea

Browse files
committed
refactor(balance): refactor balance indexing + add test coverage
refactor(evm/batcher): create reusable batcher struct remove code
1 parent 060a32b commit d4857ea

File tree

19 files changed

+629
-632
lines changed

19 files changed

+629
-632
lines changed

backend/cmd/eth1indexer/main.go

Lines changed: 27 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/gobitfly/beaconchain/pkg/commons/db"
2020
"github.com/gobitfly/beaconchain/pkg/commons/db2/data"
2121
"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
22+
"github.com/gobitfly/beaconchain/pkg/commons/db2/metadata"
2223
"github.com/gobitfly/beaconchain/pkg/commons/db2/metadataupdates"
2324
"github.com/gobitfly/beaconchain/pkg/commons/erc20"
2425
"github.com/gobitfly/beaconchain/pkg/commons/log"
@@ -29,6 +30,7 @@ import (
2930
"github.com/gobitfly/beaconchain/pkg/commons/utils"
3031
"github.com/gobitfly/beaconchain/pkg/commons/version"
3132
"github.com/gobitfly/beaconchain/pkg/executionlayer"
33+
"github.com/gobitfly/beaconchain/pkg/executionlayer/evm"
3234

3335
"github.com/coocood/freecache"
3436
"github.com/ethereum/go-ethereum/common"
@@ -145,8 +147,6 @@ func Run() {
145147

146148
chainId := strconv.FormatUint(utils.Config.Chain.ClConfig.DepositChainID, 10)
147149

148-
balanceUpdaterPrefix := chainId + ":B:"
149-
150150
nodeChainId, err := client.GetNativeClient().ChainID(context.Background())
151151
if err != nil {
152152
log.Fatal(err, "node chain id error", 0)
@@ -179,26 +179,36 @@ func Run() {
179179
go ImportEnsUpdatesLoop(bt, client, *ensBatchSize)
180180
}
181181

182-
if *enableFullBalanceUpdater {
183-
ProcessMetadataUpdates(bt, client, balanceUpdaterPrefix, *balanceUpdaterBatchSize, -1)
184-
return
185-
}
186-
187-
cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
188-
189182
bigtable, err := database.NewBigTable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, nil)
190183
if err != nil {
191184
log.Fatal(err, "error connecting to bigtable", 0)
192185
}
193186

187+
cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
188+
metadataUpdatesStore := metadataupdates.NewStore(database.Wrap(bigtable, metadataupdates.Table), cache)
189+
dataStore := data.NewStore(database.Wrap(bigtable, data.Table))
190+
metadataStore := metadata.NewStore(database.Wrap(bigtable, metadata.Table))
191+
194192
indexer := executionlayer.NewIndexer(
195193
executionlayer.NewAdaptorV1(
196-
data.NewStore(database.Wrap(bigtable, data.Table)),
197-
metadataupdates.NewStore(database.Wrap(bigtable, metadataupdates.Table), cache),
194+
dataStore,
195+
metadataUpdatesStore,
198196
),
199197
executionlayer.AllTransformers...,
200198
)
201199

200+
balanceUpdater := executionlayer.NewBalanceUpdater(
201+
chainId,
202+
metadataUpdatesStore,
203+
metadataStore,
204+
evm.NewBatcher(chainId, client.GetNativeClient()),
205+
)
206+
207+
if *enableFullBalanceUpdater {
208+
ProcessBalanceUpdates(balanceUpdater, *balanceUpdaterBatchSize, -1)
209+
return
210+
}
211+
202212
if *block != 0 {
203213
err = IndexFromNode(bt, client, *block, *block, *concurrencyBlocks, *traceMode)
204214
if err != nil {
@@ -361,7 +371,7 @@ func Run() {
361371
}
362372

363373
if *enableBalanceUpdater {
364-
ProcessMetadataUpdates(bt, client, balanceUpdaterPrefix, *balanceUpdaterBatchSize, 10)
374+
ProcessBalanceUpdates(balanceUpdater, *balanceUpdaterBatchSize, 10)
365375
}
366376

367377
log.Infof("index run completed")
@@ -545,55 +555,15 @@ func HandleChainReorgs(bt *db.Bigtable, client *rpc.ErigonClient, depth int) err
545555
return nil
546556
}
547557

548-
func ProcessMetadataUpdates(bt *db.Bigtable, client *rpc.ErigonClient, prefix string, batchSize int, iterations int) {
549-
lastKey := prefix
550-
551-
its := 0
552-
for {
558+
func ProcessBalanceUpdates(balanceUpdater executionlayer.BalanceUpdater, batchSize int, iterations int) {
559+
for its := 0; iterations == -1 || its < iterations; its++ {
553560
start := time.Now()
554-
keys, pairs, err := bt.GetMetadataUpdates(prefix, lastKey, batchSize)
561+
balances, err := balanceUpdater.UpdateBalances(int64(batchSize))
555562
if err != nil {
556-
log.Error(err, "error retrieving metadata updates from bigtable", 0)
557-
return
558-
}
559-
560-
if len(keys) == 0 {
561-
return
562-
}
563-
564-
balances := make([]*types.Eth1AddressBalance, 0, len(pairs))
565-
for b := 0; b < len(pairs); b += batchSize {
566-
start := b
567-
end := b + batchSize
568-
if len(pairs) < end {
569-
end = len(pairs)
570-
}
571-
572-
log.Infof("processing batch %v with start %v and end %v", b, start, end)
573-
574-
b, err := client.GetBalances(pairs[start:end], 2, 4)
575-
576-
if err != nil {
577-
log.Error(err, "error retrieving balances from node", 0)
578-
return
579-
}
580-
balances = append(balances, b...)
581-
}
582-
583-
err = bt.SaveBalances(balances, keys)
584-
if err != nil {
585-
log.Error(err, "error saving balances to bigtable", 0)
586-
return
587-
}
588-
589-
lastKey = keys[len(keys)-1]
590-
log.Infof("retrieved %v balances in %v, currently at %v", len(balances), time.Since(start), lastKey)
591-
592-
its++
593-
594-
if iterations != -1 && its > iterations {
563+
log.Error(err, "error updating balances", 0)
595564
return
596565
}
566+
log.Infof("retrieved %v balances in %v, currently at %s", len(balances), time.Since(start), balances[len(balances)-1].Address)
597567
}
598568
}
599569

backend/pkg/commons/db/bigtable_eth1.go

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2339,53 +2339,6 @@ func (bigtable *Bigtable) SaveContractMetadata(address []byte, metadata *types.C
23392339
return bigtable.tableMetadata.Apply(ctx, fmt.Sprintf("%s:%x", bigtable.chainId, address), mut)
23402340
}
23412341

2342-
func (bigtable *Bigtable) SaveBalances(balances []*types.Eth1AddressBalance, deleteKeys []string) error {
2343-
if len(balances) == 0 {
2344-
return nil
2345-
}
2346-
2347-
mutsWrite := &types.BulkMutations{
2348-
Keys: make([]string, 0, len(balances)),
2349-
Muts: make([]*gcp_bigtable.Mutation, 0, len(balances)),
2350-
}
2351-
2352-
for _, balance := range balances {
2353-
mutWrite := gcp_bigtable.NewMutation()
2354-
2355-
mutWrite.Set(ACCOUNT_METADATA_FAMILY, fmt.Sprintf("B:%x", balance.Token), gcp_bigtable.Timestamp(0), balance.Balance)
2356-
mutsWrite.Keys = append(mutsWrite.Keys, fmt.Sprintf("%s:%x", bigtable.chainId, balance.Address))
2357-
mutsWrite.Muts = append(mutsWrite.Muts, mutWrite)
2358-
}
2359-
2360-
err := bigtable.WriteBulk(mutsWrite, bigtable.tableMetadata, DEFAULT_BATCH_INSERTS)
2361-
2362-
if err != nil {
2363-
return err
2364-
}
2365-
2366-
if len(deleteKeys) == 0 {
2367-
return nil
2368-
}
2369-
mutsDelete := &types.BulkMutations{
2370-
Keys: make([]string, 0, len(balances)),
2371-
Muts: make([]*gcp_bigtable.Mutation, 0, len(balances)),
2372-
}
2373-
for _, key := range deleteKeys {
2374-
mutDelete := gcp_bigtable.NewMutation()
2375-
mutDelete.DeleteRow()
2376-
mutsDelete.Keys = append(mutsDelete.Keys, key)
2377-
mutsDelete.Muts = append(mutsDelete.Muts, mutDelete)
2378-
}
2379-
2380-
err = bigtable.WriteBulk(mutsDelete, bigtable.tableMetadataUpdates, DEFAULT_BATCH_INSERTS)
2381-
2382-
if err != nil {
2383-
return err
2384-
}
2385-
2386-
return nil
2387-
}
2388-
23892342
func (bigtable *Bigtable) SaveERC20TokenPrices(prices []*types.ERC20TokenPrice) error {
23902343
if len(prices) == 0 {
23912344
return nil

backend/pkg/commons/db2/database/bigtable.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ func (w TableWrapper) GetRowsWithKeys(keys []string) ([]Row, error) {
8282
return res, nil
8383
}
8484

85+
func (w TableWrapper) DeleteRowsWithKeys(keys []string, opts ...Option) error {
86+
return w.BigTable.DeleteRowsWithKeys(w.table, keys, opts...)
87+
}
88+
8589
// BigTable is a wrapper around Google Cloud Bigtable for storing and retrieving data
8690
type BigTable struct {
8791
client *bigtable.Client
@@ -162,10 +166,6 @@ func createTableAndFamilies(ctx context.Context, admin *bigtable.AdminClient, ta
162166
func (b BigTable) BulkAdd(table string, itemsByKey map[string][]Item, opts ...Option) error {
163167
options := apply(opts)
164168

165-
tbl := b.client.Open(table)
166-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
167-
defer cancel()
168-
169169
var keys []string
170170
var muts []*bigtable.Mutation
171171
for key, items := range itemsByKey {
@@ -180,9 +180,17 @@ func (b BigTable) BulkAdd(table string, itemsByKey map[string][]Item, opts ...Op
180180
keys = append(keys, key)
181181
muts = append(muts, mut)
182182
}
183+
return b.bulk(table, keys, muts, options)
184+
}
185+
186+
func (b BigTable) bulk(table string, keys []string, mutations []*bigtable.Mutation, options options) error {
187+
tbl := b.client.Open(table)
188+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
189+
defer cancel()
190+
183191
bulk := &bulkMutations{
184192
Keys: keys,
185-
Muts: muts,
193+
Muts: mutations,
186194
}
187195
sort.Sort(bulk)
188196
for i := int64(0); i < int64(bulk.Len()); i = i + options.BatchSize {
@@ -337,6 +345,23 @@ func (b BigTable) GetRowsWithKeys(table string, keys []string) ([]Row, error) {
337345
return data, nil
338346
}
339347

348+
func (b BigTable) DeleteRowsWithKeys(table string, source []string, opts ...Option) error {
349+
options := apply(opts)
350+
351+
var muts []*bigtable.Mutation
352+
var keys []string
353+
for _, key := range source {
354+
if key == "" {
355+
continue
356+
}
357+
mut := bigtable.NewMutation()
358+
mut.DeleteRow()
359+
muts = append(muts, mut)
360+
keys = append(keys, key)
361+
}
362+
return b.bulk(table, keys, muts, options)
363+
}
364+
340365
func (b BigTable) Clear() error {
341366
ctx, cancel := context.WithTimeout(context.Background(), timeout)
342367
defer cancel()

backend/pkg/commons/db2/database/bigtable_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,19 @@ func TestBigTable(t *testing.T) {
158158
}
159159
}
160160
})
161+
162+
t.Run("DeleteRowsWithKeys", func(t *testing.T) {
163+
if err := db.DeleteRowsWithKeys(maps.Keys(tt.items)); err != nil {
164+
t.Error(err)
165+
}
166+
res, err := db.Read("")
167+
if err != nil {
168+
t.Error(err)
169+
}
170+
if len(res) != 0 {
171+
t.Errorf("not empty after delete %v", res)
172+
}
173+
})
161174
})
162175
}
163176

backend/pkg/commons/db2/database/database.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ type Database interface {
66
GetRow(key string) (*Row, error)
77
GetRowsWithKeys(keys []string) ([]Row, error)
88
GetRowsRange(high, low string, opts ...Option) ([]Row, error)
9+
DeleteRowsWithKeys(keys []string, opts ...Option) error
910

1011
Close() error
1112
Clear() error

backend/pkg/commons/db2/database/option.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package database
22

33
const (
4-
defaultBatchSize = 10000
4+
defaultBatchSize = 10_000
55
defaultLimit = 100
66
)
77

backend/pkg/commons/db2/database/remote.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,10 @@ func (r RemoteClient) GetRowsWithKeys(keys []string) ([]Row, error) {
300300
return rows, nil
301301
}
302302

303+
func (r RemoteClient) DeleteRowsWithKeys(keys []string, opts ...Option) error {
304+
panic("implement me")
305+
}
306+
303307
func (r RemoteClient) Close() error {
304308
//TODO implement me
305309
panic("implement me")
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package metadata
2+
3+
import (
4+
"fmt"
5+
"math/big"
6+
7+
"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
8+
"github.com/gobitfly/beaconchain/pkg/commons/db2/metadataupdates"
9+
)
10+
11+
type Balance struct {
12+
metadataupdates.Pair
13+
Value *big.Int
14+
}
15+
type Store struct {
16+
db database.Database
17+
}
18+
19+
func NewStore(db database.Database) Store {
20+
return Store{
21+
db: db,
22+
}
23+
}
24+
25+
func (store Store) UpdateBalance(chainID string, balances []Balance) error {
26+
updates := make(map[string][]database.Item)
27+
for _, balance := range balances {
28+
key := fmt.Sprintf("%s:%x", chainID, balance.Address)
29+
updates[key] = append(updates[key], database.Item{
30+
Family: accountFamily,
31+
Column: fmt.Sprintf("B:%x", balance.Token),
32+
Data: balance.Value.Bytes(),
33+
})
34+
}
35+
return store.db.BulkAdd(updates)
36+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package metadata
2+
3+
const Table = "metadata"
4+
5+
var Schema = map[string][]string{
6+
Table: {
7+
accountFamily,
8+
},
9+
}
10+
11+
const (
12+
accountFamily = "a"
13+
)

backend/pkg/commons/db2/metadataupdates/keys.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ func ContractUpdate(blockNumber uint64, chainID string, updates []ContractUpdate
6767
func MarkBalanceUpdate(chainID string, address []byte, token []byte, cache Cache) map[string][]database.Item {
6868
items := make(map[string][]database.Item)
6969

70-
key := fmt.Sprintf("%s:B:%x", chainID, address) // format is B: for balance update as chainid:prefix:address (token id will be encoded as column name)
71-
keyCache := []byte(fmt.Sprintf("%s:B:%x:%x", chainID, address, token))
70+
key := fmt.Sprintf("%s:%s:%x", chainID, balanceKey, address) // format is B: for balance update as chainid:prefix:address (token id will be encoded as column name)
71+
keyCache := []byte(fmt.Sprintf("%s:%s:%x:%x", chainID, balanceKey, address, token))
7272
if _, err := cache.Get(keyCache); err != nil {
7373
items[key] = []database.Item{
7474
{

0 commit comments

Comments
 (0)