@@ -19,6 +19,7 @@ import (
1919 "github.com/gobitfly/beaconchain/pkg/commons/db"
2020 "github.com/gobitfly/beaconchain/pkg/commons/db2/data"
2121 "github.com/gobitfly/beaconchain/pkg/commons/db2/database"
22+ "github.com/gobitfly/beaconchain/pkg/commons/db2/metadata"
2223 "github.com/gobitfly/beaconchain/pkg/commons/db2/metadataupdates"
2324 "github.com/gobitfly/beaconchain/pkg/commons/erc20"
2425 "github.com/gobitfly/beaconchain/pkg/commons/log"
@@ -29,6 +30,7 @@ import (
2930 "github.com/gobitfly/beaconchain/pkg/commons/utils"
3031 "github.com/gobitfly/beaconchain/pkg/commons/version"
3132 "github.com/gobitfly/beaconchain/pkg/executionlayer"
33+ "github.com/gobitfly/beaconchain/pkg/executionlayer/evm"
3234
3335 "github.com/coocood/freecache"
3436 "github.com/ethereum/go-ethereum/common"
@@ -145,8 +147,6 @@ func Run() {
145147
146148 chainId := strconv .FormatUint (utils .Config .Chain .ClConfig .DepositChainID , 10 )
147149
148- balanceUpdaterPrefix := chainId + ":B:"
149-
150150 nodeChainId , err := client .GetNativeClient ().ChainID (context .Background ())
151151 if err != nil {
152152 log .Fatal (err , "node chain id error" , 0 )
@@ -179,26 +179,36 @@ func Run() {
179179 go ImportEnsUpdatesLoop (bt , client , * ensBatchSize )
180180 }
181181
182- if * enableFullBalanceUpdater {
183- ProcessMetadataUpdates (bt , client , balanceUpdaterPrefix , * balanceUpdaterBatchSize , - 1 )
184- return
185- }
186-
187- cache := freecache .NewCache (100 * 1024 * 1024 ) // 100 MB limit
188-
189182 bigtable , err := database .NewBigTable (utils .Config .Bigtable .Project , utils .Config .Bigtable .Instance , nil )
190183 if err != nil {
191184 log .Fatal (err , "error connecting to bigtable" , 0 )
192185 }
193186
187+ cache := freecache .NewCache (100 * 1024 * 1024 ) // 100 MB limit
188+ metadataUpdatesStore := metadataupdates .NewStore (database .Wrap (bigtable , metadataupdates .Table ), cache )
189+ dataStore := data .NewStore (database .Wrap (bigtable , data .Table ))
190+ metadataStore := metadata .NewStore (database .Wrap (bigtable , metadata .Table ))
191+
194192 indexer := executionlayer .NewIndexer (
195193 executionlayer .NewAdaptorV1 (
196- data . NewStore ( database . Wrap ( bigtable , data . Table )) ,
197- metadataupdates . NewStore ( database . Wrap ( bigtable , metadataupdates . Table ), cache ) ,
194+ dataStore ,
195+ metadataUpdatesStore ,
198196 ),
199197 executionlayer .AllTransformers ... ,
200198 )
201199
200+ balanceUpdater := executionlayer .NewBalanceUpdater (
201+ chainId ,
202+ metadataUpdatesStore ,
203+ metadataStore ,
204+ evm .NewBatcher (chainId , client .GetNativeClient ()),
205+ )
206+
207+ if * enableFullBalanceUpdater {
208+ ProcessBalanceUpdates (balanceUpdater , * balanceUpdaterBatchSize , - 1 )
209+ return
210+ }
211+
202212 if * block != 0 {
203213 err = IndexFromNode (bt , client , * block , * block , * concurrencyBlocks , * traceMode )
204214 if err != nil {
@@ -361,7 +371,7 @@ func Run() {
361371 }
362372
363373 if * enableBalanceUpdater {
364- ProcessMetadataUpdates ( bt , client , balanceUpdaterPrefix , * balanceUpdaterBatchSize , 10 )
374+ ProcessBalanceUpdates ( balanceUpdater , * balanceUpdaterBatchSize , 10 )
365375 }
366376
367377 log .Infof ("index run completed" )
@@ -545,55 +555,15 @@ func HandleChainReorgs(bt *db.Bigtable, client *rpc.ErigonClient, depth int) err
545555 return nil
546556}
547557
548- func ProcessMetadataUpdates (bt * db.Bigtable , client * rpc.ErigonClient , prefix string , batchSize int , iterations int ) {
549- lastKey := prefix
550-
551- its := 0
552- for {
558+ func ProcessBalanceUpdates (balanceUpdater executionlayer.BalanceUpdater , batchSize int , iterations int ) {
559+ for its := 0 ; iterations == - 1 || its < iterations ; its ++ {
553560 start := time .Now ()
554- keys , pairs , err := bt . GetMetadataUpdates ( prefix , lastKey , batchSize )
561+ balances , err := balanceUpdater . UpdateBalances ( int64 ( batchSize ) )
555562 if err != nil {
556- log .Error (err , "error retrieving metadata updates from bigtable" , 0 )
557- return
558- }
559-
560- if len (keys ) == 0 {
561- return
562- }
563-
564- balances := make ([]* types.Eth1AddressBalance , 0 , len (pairs ))
565- for b := 0 ; b < len (pairs ); b += batchSize {
566- start := b
567- end := b + batchSize
568- if len (pairs ) < end {
569- end = len (pairs )
570- }
571-
572- log .Infof ("processing batch %v with start %v and end %v" , b , start , end )
573-
574- b , err := client .GetBalances (pairs [start :end ], 2 , 4 )
575-
576- if err != nil {
577- log .Error (err , "error retrieving balances from node" , 0 )
578- return
579- }
580- balances = append (balances , b ... )
581- }
582-
583- err = bt .SaveBalances (balances , keys )
584- if err != nil {
585- log .Error (err , "error saving balances to bigtable" , 0 )
586- return
587- }
588-
589- lastKey = keys [len (keys )- 1 ]
590- log .Infof ("retrieved %v balances in %v, currently at %v" , len (balances ), time .Since (start ), lastKey )
591-
592- its ++
593-
594- if iterations != - 1 && its > iterations {
563+ log .Error (err , "error updating balances" , 0 )
595564 return
596565 }
566+ log .Infof ("retrieved %v balances in %v, currently at %s" , len (balances ), time .Since (start ), balances [len (balances )- 1 ].Address )
597567 }
598568}
599569
0 commit comments