Skip to content

Commit d09b63c

Browse files
committed
feat(dashboard_exporter)!: backfill for electra fork epoch events
1 parent bc03542 commit d09b63c

File tree

3 files changed

+197
-8
lines changed

3 files changed

+197
-8
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
CREATE TABLE IF NOT EXISTS _insert_sink_backfill_electra_fork_epoch_events
4+
(
5+
`epoch_timestamp` DateTime COMMENT 'timestamp of the epoch',
6+
`validator_index` UInt64 COMMENT 'validator index',
7+
`withdrawals_amount` Int64 COMMENT 'sum of withdrawals_amount',
8+
`withdrawals_count` Int64 COMMENT 'sum of withdrawals_count',
9+
-- we will need something to fix the roi as well
10+
-- roi_dividend is `balance_end - deposits_amount + withdrawals_amount - consolidations_incoming_amount + consolidations_outgoing_amount`
11+
-- so to fix it, subtract the withdrawals_amount from the roi_divisor
12+
)
13+
ENGINE = ReplacingMergeTree() -- we need it persistent because we will use it to update the epoch table through the dictionary
14+
ORDER BY (validator_index, epoch_timestamp)
15+
SETTINGS index_granularity=128, non_replicated_deduplication_window = 2048, replicated_deduplication_window = 2048;
16+
-- +goose StatementEnd
17+
-- +goose StatementBegin
18+
-- connect the sink to the _final_validator_dashboard_roi_hourly table
19+
CREATE MATERIALIZED VIEW IF NOT EXISTS _mv_backfill_final_validator_dashboard_roi_hourly_electra_fork TO _final_validator_dashboard_roi_hourly
20+
(
21+
`validator_index` UInt64,
22+
`t` DateTime,
23+
`roi_dividend` Int128,
24+
`roi_divisor` Int128
25+
)
26+
AS SELECT
27+
validator_index AS validator_index,
28+
toStartOfHour(foo.epoch_timestamp) AS t,
29+
SUM(CAST(foo.withdrawals_amount, 'Int128')) AS roi_dividend, -- i.e, we add the withdrawal amount to the roi_dividend
30+
0 AS roi_divisor -- i.e, we dont modify the divisor
31+
FROM _insert_sink_backfill_electra_fork_epoch_events AS foo
32+
GROUP BY
33+
t,
34+
validator_index
35+
-- once it is in the hourly, it will be automatically propagated to the daily, weekly and monthly tables by the existing materialized views
36+
-- +goose StatementEnd
37+
-- +goose StatementBegin
38+
-- connect the sink to the _final_validator_dashboard_data_hourly table
39+
CREATE MATERIALIZED VIEW IF NOT EXISTS _mv_backfill_final_validator_dashboard_data_hourly_electra_fork TO _final_validator_dashboard_data_hourly
40+
(
41+
`validator_index` UInt64,
42+
`t` DateTime,
43+
`withdrawals_amount` Int64,
44+
`withdrawals_count` Int64
45+
)
46+
AS SELECT
47+
validator_index AS validator_index,
48+
toStartOfHour(foo.epoch_timestamp) AS t,
49+
SUM(foo.withdrawals_amount) AS withdrawals_amount, -- i.e we add the withdrawal amount to the withdrawal amount
50+
SUM(foo.withdrawals_count) AS withdrawals_count -- i.e we add the withdrawal count to the withdrawal count
51+
FROM _insert_sink_backfill_electra_fork_epoch_events AS foo
52+
GROUP BY
53+
t,
54+
validator_index
55+
-- +goose StatementEnd
56+
-- +goose StatementBegin
57+
SELECT 'the migration will now attempt to create a dictionary. this will only work if the migration is run as the default (root) user.';
58+
-- +goose StatementEnd
59+
-- +goose StatementBegin
60+
CREATE DICTIONARY IF NOT EXISTS _dict_backfill_electra_fork_epoch_events (validator_index Int64, epoch_timestamp DateTime, withdrawals_amount Int64, withdrawals_count Int64)
61+
PRIMARY KEY epoch_timestamp, validator_index SOURCE(CLICKHOUSE(TABLE _insert_sink_backfill_electra_fork_epoch_events DB currentDatabase()))
62+
LAYOUT(complex_key_direct()); -- we dont use complex_key_cache here because that would require us to make sure that the cache is up to date when we trigger the mutation. it should only be at most 200k keys anyways
63+
-- +goose StatementEnd
64+
-- +goose StatementBegin
65+
-- kickstart the backfill with the latest exported epoch
66+
INSERT INTO _exporter_backfill_metadata
67+
SELECT
68+
epoch,
69+
'electra_fork_epoch_events' as backfill_name,
70+
transfer_batch_id as backfill_batch_id,
71+
NULL AS successful_backfill
72+
FROM
73+
(
74+
SELECT *
75+
FROM _exporter_metadata
76+
WHERE successful_transfer IS NOT NULL
77+
ORDER BY epoch DESC
78+
LIMIT 1
79+
)
80+
-- +goose StatementEnd
81+
-- +goose StatementBegin
82+
SELECT 'successfully created the dictionary. now start the dashboard exporter again. once the backfill is done (i.e it shows up as 0 epochs left in the grafana dashboard), run the next migration version to update the epoch table.';
83+
-- +goose StatementEnd
84+
85+
-- +goose Down
86+
-- +goose StatementBegin
87+
DROP VIEW IF EXISTS _mv_backfill_final_validator_dashboard_roi_hourly_electra_fork;
88+
-- +goose StatementEnd
89+
-- +goose StatementBegin
90+
DROP VIEW IF EXISTS _mv_backfill_final_validator_dashboard_data_hourly_electra_fork;
91+
-- +goose StatementEnd
92+
-- +goose StatementBegin
93+
ALTER TABLE _exporter_backfill_metadata
94+
DELETE WHERE backfill_name = 'electra_fork_epoch_events';
95+
-- +goose StatementEnd
96+
-- +goose StatementBegin
97+
DROP DICTIONARY IF EXISTS _dict_backfill_electra_fork_epoch_events;
98+
-- +goose StatementEnd
99+
-- +goose StatementBegin
100+
DROP TABLE IF EXISTS _insert_sink_backfill_electra_fork_epoch_events;
101+
-- +goose StatementEnd

backend/pkg/exporter/db/db.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,7 +1791,10 @@ func GetPendingBackfillEpochs(t BackfillType, limit int64) ([]BackfillMetadata,
17911791
return nil, fmt.Errorf("error fetching min assigned epoch: %w", err)
17921792
}
17931793
if minAssignedEpoch == -1 {
1794-
return nil, fmt.Errorf("no epochs found with transfer batch id")
1794+
// this isn't necessarily an error, because on a fresh export this backfill never gets assigned
1795+
// so log a debug message and return nil
1796+
log.Debugf("no epochs found with transfer batch id %s. this is expected if the export was started after the backfill was implemented", t)
1797+
return nil, nil
17951798
}
17961799
if minAssignedEpoch == 0 {
17971800
// this means we have assigned epochs till the first one, no more pending epochs
@@ -1983,6 +1986,7 @@ func (b BackfillType) Value() (driver.Value, error) {
19831986
}
19841987

19851988
const (
1986-
BackfillTypeRoi BackfillType = "roi"
1987-
BackfillTypeEBLookup BackfillType = "eb_lookup"
1989+
BackfillTypeRoi BackfillType = "roi"
1990+
BackfillTypeEBLookup BackfillType = "eb_lookup"
1991+
BackfillTypeElectraForkEpochEvents BackfillType = "electra_fork_epoch_events"
19881992
)

backend/pkg/exporter/modules/dashboard_data_backfills.go

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/gobitfly/beaconchain/pkg/commons/db"
78
"github.com/gobitfly/beaconchain/pkg/commons/log"
89
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
910
"github.com/gobitfly/beaconchain/pkg/commons/utils"
10-
"github.com/gobitfly/beaconchain/pkg/exporter/db"
1111
edb "github.com/gobitfly/beaconchain/pkg/exporter/db"
12+
"github.com/gobitfly/beaconchain/pkg/exporter/types"
1213
"github.com/google/uuid"
1314
"github.com/pkg/errors"
1415
"golang.org/x/sync/errgroup"
@@ -20,6 +21,7 @@ func (d *dashboardData) roiBackfillTask() {
2021
jobs := []edb.BackfillType{
2122
edb.BackfillTypeRoi,
2223
edb.BackfillTypeEBLookup,
24+
edb.BackfillTypeElectraForkEpochEvents,
2325
}
2426
for _, backfillType := range jobs {
2527
// fork for every backfill we have to do
@@ -52,7 +54,7 @@ func (d *dashboardData) roiBackfillTask() {
5254
}()
5355
}
5456
}
55-
func (d *dashboardData) handleIncompleteBackfills(t db.BackfillType) error {
57+
func (d *dashboardData) handleIncompleteBackfills(t edb.BackfillType) error {
5658
incomplete, err := edb.GetIncompleteBackfillEpochs(t)
5759
if err != nil {
5860
return errors.Wrap(err, "failed to get incomplete backfill epochs")
@@ -70,7 +72,7 @@ func (d *dashboardData) handleIncompleteBackfills(t db.BackfillType) error {
7072
return nil
7173
}
7274

73-
func (d *dashboardData) backfillEpochs(t db.BackfillType, epochs []edb.BackfillMetadata) error {
75+
func (d *dashboardData) backfillEpochs(t edb.BackfillType, epochs []edb.BackfillMetadata) error {
7476
now := time.Now()
7577
defer func() {
7678
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("dashboard_data_exporter_backfill_%s_overall", t)).Observe(time.Since(now).Seconds())
@@ -97,6 +99,8 @@ func (d *dashboardData) backfillEpochs(t db.BackfillType, epochs []edb.BackfillM
9799
err = edb.BackfillRoi(epochs)
98100
case edb.BackfillTypeEBLookup:
99101
err = edb.BackfillEBLookup(epochs)
102+
case edb.BackfillTypeElectraForkEpochEvents:
103+
err = d.BackfillElectraForkEpochEvents(epochs)
100104
default:
101105
return fmt.Errorf("unknown backfill type %s", t)
102106
}
@@ -108,6 +112,12 @@ func (d *dashboardData) backfillEpochs(t db.BackfillType, epochs []edb.BackfillM
108112

109113
now := time.Now()
110114
for i := range epochs {
115+
if epochs[i].BackfillName != t {
116+
return fmt.Errorf("backfill name %s does not match backfill type %s", epochs[i].BackfillName, t)
117+
}
118+
if epochs[i].BackfillBatchId == nil {
119+
return fmt.Errorf("backfill batch id is nil for epoch %v", epochs[i])
120+
}
111121
epochs[i].SuccessfulBackfill = &now
112122
}
113123
err = edb.PushBackfillMetadata(epochs)
@@ -125,13 +135,13 @@ func (d *dashboardData) backfillEpochs(t db.BackfillType, epochs []edb.BackfillM
125135
return nil
126136
}
127137

128-
func (d *dashboardData) handlePendingBackfills(t db.BackfillType) error {
138+
func (d *dashboardData) handlePendingBackfills(t edb.BackfillType) error {
129139
pending, err := edb.GetPendingBackfillEpochs(t, utils.Config.DashboardExporter.BackfillAtOnce*utils.Config.DashboardExporter.BackfillInParallel)
130140
if err != nil {
131141
return errors.Wrap(err, "failed to get pending backfill epochs")
132142
}
133143
if len(pending) == 0 {
134-
d.log.Debugf("handlePendingBackfills, no pending backfill epochs")
144+
d.log.Debugf("handlePendingBackfills, no pending backfill epochs") // this log messages, and basically all others, should include the backfill type
135145
return nil
136146
}
137147
d.log.InfoWithFields(log.Fields{"backfillType": t, "pendingCount": len(pending)}, "handlePendingBackfills, found pending backfill epochs")
@@ -157,3 +167,77 @@ func (d *dashboardData) handlePendingBackfills(t db.BackfillType) error {
157167
}
158168
return nil
159169
}
170+
171+
func (d *dashboardData) BackfillElectraForkEpochEvents(epochs []edb.BackfillMetadata) (err error) {
172+
metricPrefix := string("dashboard_data_exporter_backfill_" + edb.BackfillTypeElectraForkEpochEvents)
173+
start := time.Now()
174+
defer func() {
175+
metrics.TaskDuration.WithLabelValues(metricPrefix + "_batch").Observe(time.Since(start).Seconds())
176+
}()
177+
// filter out epoch that arent the fork epoch
178+
forkEpoch := utils.Config.Chain.ClConfig.ElectraForkEpoch
179+
var forkEpochs []edb.BackfillMetadata
180+
for _, e := range epochs {
181+
if e.Epoch == forkEpoch {
182+
forkEpochs = append(forkEpochs, e)
183+
}
184+
}
185+
if len(forkEpochs) == 0 {
186+
log.Debugf("no epochs to backfill for %s", edb.BackfillTypeElectraForkEpochEvents)
187+
return nil
188+
}
189+
if len(forkEpochs) > 1 {
190+
return fmt.Errorf("more than one epoch to backfill for %s", edb.BackfillTypeElectraForkEpochEvents)
191+
}
192+
backfillBatchID := epochs[0].BackfillBatchId
193+
epoch := forkEpochs[0].Epoch
194+
195+
// reuse the existing stuff to do this, tho obv could be leaner
196+
nodeData := NewMultiEpochData(1)
197+
// prefill epochBasedData.epochs
198+
nodeData.epochBasedData.epochs = []uint64{forkEpochs[0].Epoch}
199+
eg := &errgroup.Group{}
200+
eg.Go(func() error {
201+
return d.fetchEpochValidatorStates(epoch, epoch, &nodeData)
202+
})
203+
eg.Go(func() error {
204+
return d.fetchElectraRemovedExcessBalances(epoch, epoch, &nodeData)
205+
})
206+
if err := eg.Wait(); err != nil {
207+
return fmt.Errorf("failed to fetch validator states for epoch %d: %w", epoch, err)
208+
}
209+
210+
// prepare the target data. i hate this as much as you do, but i dont have the time to refactor this rn
211+
// was never really intended to be used like this anyway
212+
processedData := make([]types.VDBDataEpochColumns, 1)
213+
nodeData.epochBasedData.tarIndices = []int{0}
214+
nodeData.epochBasedData.tarOffsets = []int{0}
215+
valiCount := len(nodeData.epochBasedData.validatorStates[int64(epoch)].Data)
216+
processedData[0], err = types.NewVDBDataEpochColumns(valiCount)
217+
if err != nil {
218+
return fmt.Errorf("failed to create new VDBDataEpochColumns: %w", err)
219+
}
220+
221+
processedData[0].EpochsContained = []uint64{epoch}
222+
eg = &errgroup.Group{} // docs say we should not reuse a group for different tasks
223+
eg.Go(func() error {
224+
// sets epochTimestamp and validatorIndex
225+
return d.processValidatorStates(&nodeData, &processedData)
226+
})
227+
eg.Go(func() error {
228+
// sets withdrawalAmount and withdrawalCount
229+
return d.processElectraRemovedExcessBalanceEvents(&nodeData, &processedData)
230+
})
231+
if err := eg.Wait(); err != nil {
232+
return fmt.Errorf("failed to process electra removed excess balance events: %w", err)
233+
}
234+
235+
// now write it to the insert sink
236+
err = db.UltraFastDumpToClickhouse(&processedData[0], "_insert_sink_backfill_electra_fork_epoch_events", backfillBatchID.String())
237+
if err != nil {
238+
d.log.Error(err, "failed to insert epochs", 0, log.Fields{"epochs": forkEpochs})
239+
return errors.Wrap(err, "failed to insert epochs")
240+
}
241+
242+
return nil
243+
}

0 commit comments

Comments
 (0)