Skip to content

Commit 6e4ea11

Browse files
committed
feat(exporter): add legacy transformers to new cl event indexer
1 parent 78612b0 commit 6e4ea11

File tree

7 files changed

+381
-153
lines changed

7 files changed

+381
-153
lines changed

backend/pkg/commons/db/db.go

Lines changed: 0 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -2593,115 +2593,3 @@ func HasEventsForEpoch(epoch uint64) (bool, error) {
25932593

25942594
return count > 0, nil
25952595
}
2596-
2597-
func TransformSwitchToCompoundingRequests(firstSlot, lastSlot uint64, tx *sqlx.Tx) (int64, error) {
2598-
res, err := tx.Exec(`
2599-
INSERT INTO blocks_switch_to_compounding_requests (block_slot, block_root, request_index, address, validator_index)
2600-
SELECT
2601-
cle.slot AS slot,
2602-
cle.block_root AS block_root,
2603-
cle.event_index AS request_index,
2604-
decode((cle.data->>'address'), 'base64') AS address,
2605-
vali.validatorindex AS validator_index
2606-
FROM consensus_layer_events cle
2607-
JOIN validators AS vali
2608-
ON vali.pubkey = decode(cle.data ->> 'pubkey', 'base64')
2609-
WHERE event_name = 'SwitchToCompoundingEvent' AND slot >= $1 AND slot <= $2 AND version = $3 ON CONFLICT DO NOTHING;
2610-
`, firstSlot, lastSlot, types.ConsensusLayerEventVersion)
2611-
if err != nil {
2612-
return 0, fmt.Errorf("error transforming consolidation requests: %w", err)
2613-
}
2614-
2615-
consolidationRequestsProcessed, err := res.RowsAffected()
2616-
if err != nil {
2617-
return 0, fmt.Errorf("error getting the amount of processed consolidation requests: %w", err)
2618-
}
2619-
2620-
return consolidationRequestsProcessed, nil
2621-
}
2622-
2623-
func TransformConsolidationRequests(firstSlot, lastSlot uint64, tx *sqlx.Tx) (int64, error) {
2624-
res, err := tx.Exec(`
2625-
INSERT INTO blocks_consolidation_requests (block_slot, block_root, request_index, source_index, target_index, amount_consolidated)
2626-
SELECT
2627-
cle.slot AS block_slot,
2628-
cle.block_root AS block_root,
2629-
cle.event_index AS request_index,
2630-
src.validatorindex AS source_index,
2631-
tgt.validatorindex AS target_index,
2632-
(data->>'amount')::bigint AS amount_consolidated
2633-
FROM consensus_layer_events cle
2634-
JOIN validators AS src
2635-
ON src.pubkey = decode(cle.data ->> 'source_pubkey', 'base64')
2636-
JOIN validators AS tgt
2637-
ON tgt.pubkey = decode(cle.data ->> 'target_pubkey', 'base64')
2638-
WHERE
2639-
event_name = 'ConsolidationProcessedEvent' AND
2640-
slot >= $1 AND slot <= $2 AND
2641-
version = $3
2642-
ON CONFLICT DO NOTHING;
2643-
`, firstSlot, lastSlot, types.ConsensusLayerEventVersion)
2644-
if err != nil {
2645-
return 0, fmt.Errorf("error transforming consolidation requests: %w", err)
2646-
}
2647-
2648-
consolidationRequestsProcessed, err := res.RowsAffected()
2649-
if err != nil {
2650-
return 0, fmt.Errorf("error getting the amount of processed consolidation requests: %w", err)
2651-
}
2652-
2653-
return consolidationRequestsProcessed, nil
2654-
}
2655-
2656-
func TransformDepositRequests(firstSlot, lastSlot uint64, tx *sqlx.Tx) (int64, error) {
2657-
res, err := tx.Exec(`
2658-
INSERT INTO blocks_deposit_requests (block_slot, block_root, request_index, pubkey, withdrawal_credentials, amount, signature)
2659-
SELECT
2660-
slot AS block_slot,
2661-
block_root AS block_root,
2662-
event_index AS request_index,
2663-
decode((data->>'pubkey'), 'base64') AS pubkey,
2664-
decode((data->>'withdrawal_credentials'), 'base64')::bytea AS withdrawal_credentials,
2665-
(data->>'amount')::bigint AS amount,
2666-
decode((data->>'signature'), 'base64')::bytea AS signature
2667-
FROM consensus_layer_events WHERE event_name = 'DepositProcessedEvent' AND slot >= $1 AND slot <= $2 AND version = $3 ON CONFLICT DO NOTHING;
2668-
`, firstSlot, lastSlot, types.ConsensusLayerEventVersion)
2669-
if err != nil {
2670-
return 0, fmt.Errorf("error transforming deposit requests: %w", err)
2671-
}
2672-
2673-
depositRequestsProcessed, err := res.RowsAffected()
2674-
if err != nil {
2675-
return 0, fmt.Errorf("error getting the amount of processed deposit requests: %w", err)
2676-
}
2677-
2678-
return depositRequestsProcessed, nil
2679-
}
2680-
2681-
func TransformRemovedExcessBalanceEvents(firstSlot, lastSlot uint64, tx *sqlx.Tx) (int64, error) {
2682-
// we offset by -20000 to avoid conflicts with normal withdrawals in the blocks
2683-
res, err := tx.Exec(`
2684-
INSERT INTO blocks_withdrawals (block_slot, block_root, withdrawalindex, validatorindex, address, amount)
2685-
SELECT
2686-
cle.slot AS block_slot,
2687-
cle.block_root AS block_root,
2688-
-20000 + cle.event_index AS withdrawalindex,
2689-
(vali.validatorindex)::int AS validatorindex,
2690-
''::bytea as address,
2691-
(cle.data->>'amount')::bigint AS amount
2692-
FROM consensus_layer_events cle
2693-
JOIN validators AS vali
2694-
ON vali.pubkey = decode(cle.data ->> 'pubkey', 'base64')
2695-
WHERE event_name = 'RemovedExcessBalance' AND slot >= $1 AND slot <= $2 AND version = $3 ON CONFLICT DO NOTHING;
2696-
`, firstSlot, lastSlot, types.ConsensusLayerEventVersion)
2697-
if err != nil {
2698-
return 0, fmt.Errorf("error transforming excess balance requests: %w", err)
2699-
}
2700-
2701-
excessBalanceRequestsProcessed, err := res.RowsAffected()
2702-
if err != nil {
2703-
return 0, fmt.Errorf("error getting the amount of processed excess balance requests: %w", err)
2704-
}
2705-
2706-
return excessBalanceRequestsProcessed, nil
2707-
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package cl_transformers
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/doug-martin/goqu/v9"
7+
"github.com/gobitfly/beaconchain/pkg/commons/types"
8+
"github.com/jmoiron/sqlx"
9+
"github.com/pkg/errors"
10+
11+
"encoding/json"
12+
)
13+
14+
type LegacySwitchToCompoundingEventTransformer struct{}
15+
16+
func (d *LegacySwitchToCompoundingEventTransformer) Transform(tx *sqlx.Tx, events []types.ConsensusLayerEvent) (int, error) {
17+
eventName := types.SwitchToCompoundingEventName
18+
// filter events for the relevant type
19+
events, err := FilterEventsByTypeAndSort(events, eventName)
20+
if err != nil {
21+
return 0, errors.Wrap(err, "error filtering events")
22+
}
23+
if len(events) == 0 {
24+
return 0, nil
25+
}
26+
// convert raw data from events into actual structs
27+
data := make([]types.SwitchToCompoundingEvent, len(events))
28+
for i, event := range events {
29+
// unmarshal from data field to DepositQueuedEvent using json
30+
err = json.Unmarshal(event.RawData, &data[i])
31+
if err != nil {
32+
return 0, errors.Wrap(err, "error unmarshalling event data")
33+
}
34+
}
35+
36+
filters := make([]types.ConsensusLayerEventFilter, 0)
37+
for _, event := range events {
38+
filters = append(filters, types.ConsensusLayerEventFilter{
39+
Slot: uint64(event.Slot),
40+
BlockRoot: event.BlockRoot,
41+
})
42+
}
43+
44+
var orConditions []goqu.Expression
45+
for _, filter := range filters {
46+
orConditions = append(orConditions, goqu.Ex{
47+
"slot": filter.Slot,
48+
"block_root": filter.BlockRoot,
49+
})
50+
}
51+
52+
ds := goqu.Dialect("postgres").Insert("blocks_switch_to_compounding_requests").
53+
Cols(
54+
"block_slot",
55+
"block_root",
56+
"request_index",
57+
"address",
58+
"validator_index").
59+
FromQuery(
60+
goqu.From("consensus_layer_events").As("cle").
61+
Select(
62+
"cle.slot",
63+
"cle.block_root",
64+
"cle.event_index",
65+
goqu.L("decode((cle.data->>'address'), 'base64') AS address"),
66+
goqu.L("vali.validatorindex AS validator_index"),
67+
).
68+
Join(goqu.T("validators"), goqu.On(goqu.Ex{
69+
"vali.pubkey": goqu.L(`decode(cle.data ->> 'pubkey', 'base64')`),
70+
})).As("vali").
71+
Where(goqu.And(
72+
goqu.Ex{"event_name": eventName},
73+
goqu.Or(orConditions...),
74+
goqu.Ex{"version": types.ConsensusLayerEventVersion},
75+
)),
76+
)
77+
query, args, err := ds.Prepared(true).ToSQL()
78+
if err != nil {
79+
return 0, fmt.Errorf("error preparing query: %w", err)
80+
}
81+
82+
res, err := tx.Exec(query, args...)
83+
if err != nil {
84+
return 0, fmt.Errorf("error executing query: %w", err)
85+
}
86+
c, err := res.RowsAffected()
87+
if err != nil {
88+
return 0, fmt.Errorf("error getting rows affected: %w", err)
89+
}
90+
91+
return int(c), nil
92+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package cl_transformers
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/doug-martin/goqu/v9"
7+
"github.com/gobitfly/beaconchain/pkg/commons/types"
8+
"github.com/jmoiron/sqlx"
9+
"github.com/pkg/errors"
10+
11+
"encoding/json"
12+
)
13+
14+
type LegacyRemovedExcessBalanceEventTransformer struct{}
15+
16+
func (d *LegacyRemovedExcessBalanceEventTransformer) Transform(tx *sqlx.Tx, events []types.ConsensusLayerEvent) (int, error) {
17+
eventName := types.RemovedExcessBalanceEventName
18+
// filter events for the relevant type
19+
events, err := FilterEventsByTypeAndSort(events, eventName)
20+
if err != nil {
21+
return 0, errors.Wrap(err, "error filtering events")
22+
}
23+
if len(events) == 0 {
24+
return 0, nil
25+
}
26+
// convert raw data from events into actual structs
27+
data := make([]types.RemovedExcessBalance, len(events))
28+
for i, event := range events {
29+
// unmarshal from data field to DepositQueuedEvent using json
30+
err = json.Unmarshal(event.RawData, &data[i])
31+
if err != nil {
32+
return 0, errors.Wrap(err, "error unmarshalling event data")
33+
}
34+
}
35+
36+
filters := make([]types.ConsensusLayerEventFilter, 0)
37+
for _, event := range events {
38+
filters = append(filters, types.ConsensusLayerEventFilter{
39+
Slot: uint64(event.Slot),
40+
BlockRoot: event.BlockRoot,
41+
})
42+
}
43+
44+
var orConditions []goqu.Expression
45+
for _, filter := range filters {
46+
orConditions = append(orConditions, goqu.Ex{
47+
"slot": filter.Slot,
48+
"block_root": filter.BlockRoot,
49+
})
50+
}
51+
52+
ds := goqu.Dialect("postgres").Insert("blocks_withdrawals").
53+
Cols(
54+
"block_slot",
55+
"block_root",
56+
"withdrawalindex",
57+
"validatorindex",
58+
"address",
59+
"amount").
60+
FromQuery(
61+
goqu.From("consensus_layer_events").As("cle").
62+
Select(
63+
"cle.slot AS block_slot",
64+
"cle.block_root",
65+
goqu.L("-20000 + cle.event_index AS withdrawalindex"),
66+
goqu.L("vali.validatorindex::int AS validatorindex"),
67+
goqu.L("''::bytea AS address"),
68+
goqu.L("(cle.data->>'amount')::bigint AS amount"),
69+
).
70+
Join(goqu.T("validators").As("vali"), goqu.On(goqu.Ex{
71+
"vali.pubkey": goqu.L(`decode(cle.data ->> 'pubkey', 'base64')`),
72+
})).
73+
Where(goqu.And(
74+
goqu.Ex{"event_name": eventName},
75+
goqu.Or(orConditions...),
76+
goqu.Ex{"version": types.ConsensusLayerEventVersion},
77+
)),
78+
).OnConflict(goqu.DoNothing())
79+
80+
query, args, err := ds.Prepared(true).ToSQL()
81+
if err != nil {
82+
return 0, fmt.Errorf("error preparing query: %w", err)
83+
}
84+
85+
res, err := tx.Exec(query, args...)
86+
if err != nil {
87+
return 0, fmt.Errorf("error executing query: %w", err)
88+
}
89+
c, err := res.RowsAffected()
90+
if err != nil {
91+
return 0, fmt.Errorf("error getting rows affected: %w", err)
92+
}
93+
94+
return int(c), nil
95+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package cl_transformers
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/doug-martin/goqu/v9"
7+
"github.com/gobitfly/beaconchain/pkg/commons/types"
8+
"github.com/jmoiron/sqlx"
9+
"github.com/pkg/errors"
10+
11+
"encoding/json"
12+
)
13+
14+
type LegacyDepositProcessedEventTransformer struct{}
15+
16+
func (d *LegacyDepositProcessedEventTransformer) Transform(tx *sqlx.Tx, events []types.ConsensusLayerEvent) (int, error) {
17+
eventName := types.DepositProcessedEventName
18+
// filter events for the relevant type
19+
events, err := FilterEventsByTypeAndSort(events, eventName)
20+
if err != nil {
21+
return 0, errors.Wrap(err, "error filtering events")
22+
}
23+
if len(events) == 0 {
24+
return 0, nil
25+
}
26+
// convert raw data from events into actual structs
27+
data := make([]types.DepositProcessedEvent, len(events))
28+
for i, event := range events {
29+
// unmarshal from data field to DepositQueuedEvent using json
30+
err = json.Unmarshal(event.RawData, &data[i])
31+
if err != nil {
32+
return 0, errors.Wrap(err, "error unmarshalling event data")
33+
}
34+
}
35+
36+
filters := make([]types.ConsensusLayerEventFilter, 0)
37+
for _, event := range events {
38+
filters = append(filters, types.ConsensusLayerEventFilter{
39+
Slot: uint64(event.Slot),
40+
BlockRoot: event.BlockRoot,
41+
})
42+
}
43+
44+
var orConditions []goqu.Expression
45+
for _, filter := range filters {
46+
orConditions = append(orConditions, goqu.Ex{
47+
"slot": filter.Slot,
48+
"block_root": filter.BlockRoot,
49+
})
50+
}
51+
52+
ds := goqu.Dialect("postgres").Insert("blocks_deposit_requests").
53+
Cols(
54+
"block_slot",
55+
"block_root",
56+
"request_index",
57+
"pubkey",
58+
"withdrawal_credentials",
59+
"amount",
60+
"signature").
61+
FromQuery(
62+
goqu.From("consensus_layer_events").As("cle").
63+
Select(
64+
"cle.slot",
65+
"cle.block_root",
66+
"cle.event_index",
67+
goqu.L("decode((cle.data->>'pubkey'), 'base64') AS pubkey"),
68+
goqu.L("decode((cle.data->>'withdrawal_credentials'), 'base64')::bytea AS withdrawal_credentials"),
69+
goqu.L("(cle.data->>'amount')::bigint AS amount"),
70+
goqu.L("decode((cle.data->>'signature'), 'base64')::bytea AS signature"),
71+
).
72+
Where(goqu.And(
73+
goqu.Ex{"event_name": eventName},
74+
goqu.Or(orConditions...),
75+
goqu.Ex{"version": types.ConsensusLayerEventVersion},
76+
)),
77+
).OnConflict(goqu.DoNothing())
78+
query, args, err := ds.Prepared(true).ToSQL()
79+
if err != nil {
80+
return 0, fmt.Errorf("error preparing query: %w", err)
81+
}
82+
83+
res, err := tx.Exec(query, args...)
84+
if err != nil {
85+
return 0, fmt.Errorf("error executing query: %w", err)
86+
}
87+
c, err := res.RowsAffected()
88+
if err != nil {
89+
return 0, fmt.Errorf("error getting rows affected: %w", err)
90+
}
91+
92+
return int(c), nil
93+
}

0 commit comments

Comments
 (0)