Skip to content

Commit 277f86c

Browse files
committed
feat(exporter): add functionality for a no downtime migration of legacy balances from clickhouse to bigtable
1 parent b6d1d75 commit 277f86c

File tree

5 files changed

+143
-2
lines changed

5 files changed

+143
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea/
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package commands
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
7+
"github.com/gobitfly/beaconchain/cmd/misc/misctypes"
8+
"github.com/gobitfly/beaconchain/pkg/commons/db"
9+
"github.com/gobitfly/beaconchain/pkg/commons/log"
10+
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
11+
"github.com/gobitfly/beaconchain/pkg/commons/types"
12+
)
13+
14+
type MigrateLegacyBalancesCommand struct {
15+
FlagSet *flag.FlagSet
16+
Config migrateLegacyBalancesCommandConfig
17+
}
18+
19+
type migrateLegacyBalancesCommandConfig struct {
20+
StartEpoch uint64
21+
EndEpoch uint64
22+
}
23+
24+
func (s *MigrateLegacyBalancesCommand) ParseCommandOptions() {
25+
s.FlagSet.Uint64Var(&s.Config.StartEpoch, "legacy-balances-start-epoch", 0, "Start epoch for the balance migration")
26+
s.FlagSet.Uint64Var(&s.Config.EndEpoch, "legacy-balances-end-epoch", 0, "End epoch for the balance migration")
27+
}
28+
29+
func (s *MigrateLegacyBalancesCommand) Requires() misctypes.Requires {
30+
return misctypes.Requires{
31+
ClickhouseDBs: true,
32+
ClNode: true,
33+
}
34+
}
35+
36+
func (s *MigrateLegacyBalancesCommand) Run(clClient *rpc.LighthouseClient) error {
37+
log.Infof("command: migrate-legacy-balances start-epoch=%d end-epoch=%d", s.Config.StartEpoch, s.Config.EndEpoch)
38+
39+
for epoch := s.Config.StartEpoch; epoch <= s.Config.EndEpoch; epoch++ {
40+
state, err := clClient.GetValidatorState(epoch)
41+
if err != nil {
42+
log.Error(err, fmt.Sprintf("error getting balances for epoch %d: %v", epoch, err), 0)
43+
continue
44+
}
45+
46+
validators := make([]*types.Validator, 0, len(state.Data))
47+
for _, validator := range state.Data {
48+
validators = append(validators, &types.Validator{
49+
Index: validator.Index,
50+
Balance: validator.Balance,
51+
EffectiveBalance: validator.Validator.EffectiveBalance,
52+
})
53+
}
54+
55+
log.Infof("writing epoch %d to clickhouse with %d validators", epoch, len(state.Data))
56+
err = db.SaveLegacyValidatorBalancesToClickhouse(uint32(epoch), validators)
57+
if err != nil {
58+
return fmt.Errorf("error saving legacy balances for epoch %d: %w", epoch, err)
59+
}
60+
}
61+
return nil
62+
}
63+
64+
func (s *MigrateLegacyBalancesCommand) showHelp() {
65+
log.Infof("Usage: migrate_legacy_balances [options]")
66+
log.Infof("Options:")
67+
log.Infof(" --legacy-balances-start-epoch int\tStart epoch for the balance migration")
68+
log.Infof(" --legacy-balances-end-epoch int\tEnd epoch for the balance migration")
69+
}

backend/cmd/misc/main.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ var REQUIRES_LIST = map[string]misctypes.Requires{
8787
"clear-raw-bigtable": {
8888
RawBigtable: true,
8989
},
90-
"app-bundle": (&commands.AppBundleCommand{}).Requires(),
90+
"app-bundle": (&commands.AppBundleCommand{}).Requires(),
91+
"migrate-legacy-balances": (&commands.MigrateLegacyBalancesCommand{}).Requires(),
9192
"update-highest-active-validatorindex": {
9293
Bigtable: true,
9394
ClNode: true,
@@ -105,8 +106,12 @@ func Run() {
105106
FlagSet: fs,
106107
}
107108

109+
migrateLegacyBalancesCommand := commands.MigrateLegacyBalancesCommand{
110+
FlagSet: fs,
111+
}
112+
108113
configPath := fs.String("config", "config/default.config.yml", "Path to the config file")
109-
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, clear-raw-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications, collect-user-db-notifications, verify-fcm-tokens, app-bundle, update-highest-active-validatorindex")
114+
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, clear-raw-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications, collect-user-db-notifications, verify-fcm-tokens, app-bundle, update-highest-active-validatorindex, migrate-legacy-balances")
110115
fs.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
111116
fs.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
112117
fs.Uint64Var(&opts.User, "user", 0, "user id")
@@ -133,6 +138,8 @@ func Run() {
133138

134139
statsPartitionCommand.ParseCommandOptions()
135140
appBundleCommand.ParseCommandOptions()
141+
migrateLegacyBalancesCommand.ParseCommandOptions()
142+
136143
_ = fs.Parse(os.Args[2:])
137144

138145
if *versionFlag {
@@ -221,6 +228,9 @@ func Run() {
221228
db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&cfg.ClickHouse.WriterDatabase, &cfg.ClickHouse.ReaderDatabase, "clickhouse", "clickhouse")
222229
defer db.ClickHouseReader.Close()
223230
defer db.ClickHouseWriter.Close()
231+
232+
db.ClickHouseNativeWriter = db.MustInitClickhouseNative(&cfg.ClickHouse.WriterDatabase)
233+
defer db.ClickHouseNativeWriter.Close()
224234
}
225235

226236
// Initialize the persistent redis client
@@ -487,6 +497,8 @@ func Run() {
487497
case "app-bundle":
488498
appBundleCommand.Config.DryRun = opts.DryRun
489499
err = appBundleCommand.Run()
500+
case "migrate-legacy-balances":
501+
err = migrateLegacyBalancesCommand.Run(rpcClient)
490502
case "fix-ens":
491503
err = fixEns(erigonClient)
492504
case "fix-ens-addresses":
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package db
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/gobitfly/beaconchain/pkg/commons/types"
7+
"github.com/google/uuid"
8+
)
9+
10+
type LegacyValidatorBalanceTable struct {
11+
ValidatorIndex []uint32 `db:"validator_index"`
12+
Epoch []uint32 `db:"epoch"`
13+
Balance []uint64 `db:"balance"`
14+
EffectiveBalance []uint64 `db:"effective_balance"`
15+
}
16+
17+
func (c LegacyValidatorBalanceTable) Get(str string) any {
18+
switch str {
19+
case "validator_index":
20+
return c.ValidatorIndex
21+
case "epoch":
22+
return c.Epoch
23+
case "balance":
24+
return c.Balance
25+
case "effective_balance":
26+
return c.EffectiveBalance
27+
default:
28+
return nil
29+
}
30+
}
31+
func (c LegacyValidatorBalanceTable) Extend(cOther UltraFastClickhouseStruct) error {
32+
return nil
33+
}
34+
35+
func SaveLegacyValidatorBalancesToClickhouse(epoch uint32, state []*types.Validator) error {
36+
epochData := LegacyValidatorBalanceTable{}
37+
for _, validator := range state {
38+
epochData.ValidatorIndex = append(epochData.ValidatorIndex, uint32(validator.Index))
39+
epochData.Epoch = append(epochData.Epoch, epoch)
40+
epochData.Balance = append(epochData.Balance, validator.Balance)
41+
epochData.EffectiveBalance = append(epochData.EffectiveBalance, validator.EffectiveBalance)
42+
}
43+
44+
err := UltraFastDumpToClickhouse(epochData, "legacy_validator_epoch_balances", uuid.New().String())
45+
if err != nil {
46+
return fmt.Errorf("error writing legacy validator epoch balances to clickhouse: %w", err)
47+
}
48+
49+
return nil
50+
}

backend/pkg/exporter/modules/slot_exporter.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,15 @@ func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) e
502502
return nil
503503
})
504504

505+
// save the validator balances to clickhouse
506+
g.Go(func() error {
507+
err := db.SaveLegacyValidatorBalancesToClickhouse(uint32(epoch), block.Validators)
508+
if err != nil {
509+
return fmt.Errorf("error exporting validator balances to bigtable for slot %v: %w", block.Slot, err)
510+
}
511+
return nil
512+
})
513+
505514
// if we are exporting the head epoch, update the validator db table
506515
if isHeadEpoch {
507516
// this function sets exports the validator status into the db

0 commit comments

Comments
 (0)