Skip to content

Commit 136845a

Browse files
committed
fix(dashboard_exporter): use new indexed events table instead of raw events
1 parent 6f641c3 commit 136845a

File tree

3 files changed

+53
-41
lines changed

3 files changed

+53
-41
lines changed

backend/pkg/consapi/types/debug.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ type ElectraDeposit struct {
5656
}
5757

5858
type ElectraConsolidation struct {
59-
SourceValidatorIndex uint64 `json:"source_index,string" db:"source_index"`
60-
TargetValidatorIndex uint64 `json:"target_index,string" db:"target_index"`
61-
Amount uint64 `db:"amount"`
59+
SourcePubkey []byte `db:"source_pubkey"`
60+
TargetPubkey []byte `db:"target_pubkey"`
61+
Amount uint64 `db:"amount"`
6262
}
6363

6464
type ElectraExcessBalance struct {
65-
ValidatorIndex uint64 `json:"validator_index,string" db:"validator_index"`
66-
Amount uint64 `json:"amount,string" db:"amount"`
65+
ValidatorPubkey []byte `db:"validator_pubkey"`
66+
Amount uint64 `json:"amount,string" db:"amount"`
6767
}

backend/pkg/exporter/db/db.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,11 +1859,10 @@ func ElectraGetEpochProcessedHashes(epoch uint64) ([][]byte, error) {
18591859
`, (utils.Config.ClConfig.SlotsPerEpoch*epoch)-1))
18601860
*/
18611861
// use goqu
1862-
q := goqu.Dialect("postgres").Select("block_root").
1863-
From("consensus_layer_events").
1862+
q := goqu.Dialect("postgres").Select("transition_block_hash").
1863+
From("consensus_layer_events_indexer_metadata").
18641864
Where(
1865-
goqu.I("event_name").Eq("EpochProcessedEvent"),
1866-
goqu.I("slot").Eq((utils.Config.ClConfig.SlotsPerEpoch*epoch)-1),
1865+
goqu.I("epoch").Eq(epoch - 1), // it is what it is
18671866
)
18681867
sql, args, err := q.Prepared(true).ToSQL()
18691868
if err != nil {
@@ -1881,18 +1880,17 @@ func ElectraGetProcessedDeposits(epoch uint64) ([]constypes.ElectraDeposit, erro
18811880
endSlot := (epoch+1)*utils.Config.ClConfig.SlotsPerEpoch - 1
18821881
var result []constypes.ElectraDeposit
18831882
q := goqu.Dialect("postgres").Select(
1884-
goqu.L("data->>'amount'").As("amount"),
1885-
goqu.L("data->>'pubkey'").As("pubkey"),
1886-
// SignatureValid field, assume true if not present
1887-
goqu.COALESCE(goqu.L("data->>'signature_valid'"), goqu.L("'true'")).As("signature_valid"),
1883+
goqu.L("amount").As("amount"),
1884+
goqu.L("pubkey").As("pubkey"),
1885+
goqu.L("true").As("signature_valid"),
18881886
).
1889-
From("consensus_layer_events").
1887+
From("blocks_deposit_requests_v2").
18901888
Where(
1891-
goqu.I("event_name").Eq("DepositProcessedEvent"),
1892-
goqu.I("slot").Gte(startSlot),
1893-
goqu.I("slot").Lt(endSlot),
1889+
goqu.I("status").Eq("completed"),
1890+
goqu.I("slot_processed").Gte(startSlot),
1891+
goqu.I("slot_processed").Lt(endSlot),
18941892
).
1895-
Order(goqu.I("slot").Asc(), goqu.I("event_index").Asc())
1893+
Order(goqu.I("slot_processed").Asc(), goqu.I("index_processed").Asc())
18961894
sql, args, err := q.Prepared(true).ToSQL()
18971895
if err != nil {
18981896
return nil, fmt.Errorf("error fetching electra deposits for epoch %v: %w", epoch, err)
@@ -1910,16 +1908,17 @@ func ElectraGetProcessedConsolidations(epoch uint64) ([]constypes.ElectraConsoli
19101908
endSlot := (epoch+1)*utils.Config.ClConfig.SlotsPerEpoch - 1
19111909
var consolidations []constypes.ElectraConsolidation
19121910
q := goqu.Dialect("postgres").Select(
1913-
goqu.L("data->>'amount'").As("amount"),
1914-
goqu.L("data->>'source_index'").As("source_index"),
1915-
goqu.L("data->>'target_index'").As("target_index"),
1911+
goqu.L("amount_consolidated").As("amount"),
1912+
goqu.L("source_pubkey").As("source_pubkey"),
1913+
goqu.L("target_pubkey").As("target_pubkey"),
19161914
).
1917-
From("consensus_layer_events").
1915+
From("blocks_consolidation_requests_v2").
19181916
Where(
1919-
goqu.I("event_name").Eq("ConsolidationProcessedEvent"),
1920-
goqu.I("slot").Gte(startSlot),
1921-
goqu.I("slot").Lt(endSlot),
1922-
)
1917+
goqu.I("status").Eq("completed"),
1918+
goqu.I("slot_processed").Gte(startSlot),
1919+
goqu.I("slot_processed").Lt(endSlot),
1920+
).
1921+
Order(goqu.I("slot_processed").Asc(), goqu.I("index_processed").Asc())
19231922
sql, args, err := q.Prepared(true).ToSQL()
19241923
if err != nil {
19251924
return nil, fmt.Errorf("error fetching electra consolidations for epoch %v: %w", epoch, err)
@@ -1936,15 +1935,16 @@ func ElectraGetRemovedExcessBalanceEvents(epoch uint64) ([]constypes.ElectraExce
19361935
endSlot := (epoch+1)*utils.Config.ClConfig.SlotsPerEpoch - 1
19371936
var excessBalanceEvents []constypes.ElectraExcessBalance
19381937
q := goqu.Dialect("postgres").Select(
1939-
goqu.L("data->>'validator_index'").As("validator_index"),
1940-
goqu.L("data->>'amount'").As("amount"),
1938+
goqu.L("validator_pubkey").As("validator_pubkey"),
1939+
goqu.L("amount").As("amount"),
19411940
).
1942-
From("consensus_layer_events").
1941+
From("blocks_removed_excess_balance_events").
19431942
Where(
1944-
goqu.I("event_name").Eq("RemovedExcessBalanceEvent"),
1945-
goqu.I("slot").Gte(startSlot),
1946-
goqu.I("slot").Lt(endSlot),
1947-
)
1943+
goqu.I("status").Eq("completed"),
1944+
goqu.I("slot_processed").Gte(startSlot),
1945+
goqu.I("slot_processed").Lt(endSlot),
1946+
).
1947+
Order(goqu.I("slot_processed").Asc(), goqu.I("index_processed").Asc())
19481948
sql, args, err := q.Prepared(true).ToSQL()
19491949
if err != nil {
19501950
return nil, fmt.Errorf("error fetching electra excess balance events for epoch %v: %w", epoch, err)

backend/pkg/exporter/modules/dashboard_data_process.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -573,12 +573,20 @@ func (d *dashboardData) processElectraConsolidations(data *MultiEpochData, tar *
573573
return nil
574574
}
575575
for _, consolidation := range data.epochBasedData.electraConsolidations[epoch] {
576-
(*tar)[tI].ConsolidationsIncomingAmount[uint64(tO)+consolidation.TargetValidatorIndex] += int64(consolidation.Amount)
577-
(*tar)[tI].ConsolidationsIncomingCount[uint64(tO)+consolidation.TargetValidatorIndex]++
578-
(*tar)[tI].ConsolidationsOutgoingAmount[uint64(tO)+consolidation.SourceValidatorIndex] += int64(consolidation.Amount)
579-
(*tar)[tI].ConsolidationsOutgoingCount[uint64(tO)+consolidation.SourceValidatorIndex]++
576+
targetIndex, ok := data.validatorBasedData.validatorIndices[string(consolidation.TargetPubkey)]
577+
if !ok {
578+
return fmt.Errorf("target validator %s not found in validator indices map", string(consolidation.TargetPubkey))
579+
}
580+
sourceIndex, ok := data.validatorBasedData.validatorIndices[string(consolidation.SourcePubkey)]
581+
if !ok {
582+
return fmt.Errorf("source validator %s not found in validator indices map", string(consolidation.SourcePubkey))
583+
}
584+
(*tar)[tI].ConsolidationsIncomingAmount[uint64(tO)+targetIndex] += int64(consolidation.Amount)
585+
(*tar)[tI].ConsolidationsIncomingCount[uint64(tO)+targetIndex]++
586+
(*tar)[tI].ConsolidationsOutgoingAmount[uint64(tO)+sourceIndex] += int64(consolidation.Amount)
587+
(*tar)[tI].ConsolidationsOutgoingCount[uint64(tO)+sourceIndex]++
580588
// source => target
581-
d.log.Tracef("processed electra consolidation: %d => %d %d GWEI in epoch %d", consolidation.SourceValidatorIndex, consolidation.TargetValidatorIndex, consolidation.Amount, epoch)
589+
d.log.Tracef("processed electra consolidation: %d => %d %d GWEI in epoch %d", consolidation.SourcePubkey, consolidation.TargetPubkey, consolidation.Amount, epoch)
582590
}
583591
return nil
584592
})
@@ -603,9 +611,13 @@ func (d *dashboardData) processElectraRemovedExcessBalanceEvents(data *MultiEpoc
603611
return nil
604612
}
605613
for _, event := range data.epochBasedData.electraRemovedExcessBalanceEvents[epoch] {
606-
(*tar)[tI].WithdrawalsAmount[uint64(tO)+event.ValidatorIndex] += int64(event.Amount)
607-
(*tar)[tI].WithdrawalsCount[uint64(tO)+event.ValidatorIndex]++
608-
d.log.Tracef("processed electra removed excess balance event: %d %d GWEI in epoch %d", event.ValidatorIndex, event.Amount, epoch)
614+
validatorIndex, ok := data.validatorBasedData.validatorIndices[string(event.ValidatorPubkey)]
615+
if !ok {
616+
return fmt.Errorf("validator %s not found in validator indices map", string(event.ValidatorPubkey))
617+
}
618+
(*tar)[tI].WithdrawalsAmount[uint64(tO)+validatorIndex] += int64(event.Amount)
619+
(*tar)[tI].WithdrawalsCount[uint64(tO)+validatorIndex]++
620+
d.log.Tracef("processed electra removed excess balance event: %d %d GWEI in epoch %d", event.ValidatorPubkey, event.Amount, epoch)
609621
}
610622
return nil
611623
})

0 commit comments

Comments
 (0)