Skip to content

Commit e7e3566

Browse files
committed
feat: reference queue deposit request
1 parent 867063a commit e7e3566

File tree

3 files changed

+86
-45
lines changed

3 files changed

+86
-45
lines changed

backend/pkg/commons/db/db.go

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2522,54 +2522,20 @@ func CopyToTable[T []any](tableName string, columns []string, data []T) error {
25222522
return nil
25232523
}
25242524

2525-
func ClearAndCopyToTable[T []any](db *sqlx.DB, tableName string, columns []string, data []T) error {
2526-
conn, err := db.Conn(context.Background())
2525+
func ClearAndCopyToTable[T []any](tx pgx.Tx, tableName string, columns []string, data []T) error {
2526+
// clear
2527+
_, err := tx.Exec(context.Background(), fmt.Sprintf("TRUNCATE TABLE %s", tableName))
25272528
if err != nil {
2528-
return fmt.Errorf("error retrieving raw sql connection: %w", err)
2529+
return fmt.Errorf("failed to truncate table %s: %w", tableName, err)
25292530
}
2530-
defer conn.Close()
2531-
err = conn.Raw(func(driverConn interface{}) error {
2532-
conn := driverConn.(*stdlib.Conn).Conn()
2533-
2534-
pgxdecimal.Register(conn.TypeMap())
2535-
tx, err := conn.Begin(context.Background())
2536-
2537-
if err != nil {
2538-
return err
2539-
}
2540-
defer func() {
2541-
err := tx.Rollback(context.Background())
2542-
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
2543-
log.Error(err, "error rolling back transaction", 0)
2544-
}
2545-
}()
25462531

2547-
// clear
2548-
_, err = tx.Exec(context.Background(), fmt.Sprintf("TRUNCATE TABLE %s", tableName))
2549-
if err != nil {
2550-
return fmt.Errorf("failed to truncate table %s: %w", tableName, err)
2551-
}
2532+
// copy
2533+
_, err = tx.CopyFrom(context.Background(), pgx.Identifier{tableName}, columns,
2534+
pgx.CopyFromSlice(len(data), func(i int) ([]interface{}, error) {
2535+
return data[i], nil
2536+
}))
25522537

2553-
// copy
2554-
_, err = tx.CopyFrom(context.Background(), pgx.Identifier{tableName}, columns,
2555-
pgx.CopyFromSlice(len(data), func(i int) ([]interface{}, error) {
2556-
return data[i], nil
2557-
}))
2558-
2559-
if err != nil {
2560-
return err
2561-
}
2562-
2563-
err = tx.Commit(context.Background())
2564-
if err != nil {
2565-
return err
2566-
}
2567-
return nil
2568-
})
2569-
if err != nil {
2570-
return fmt.Errorf("error copying data to %s: %w", tableName, err)
2571-
}
2572-
return nil
2538+
return err
25732539
}
25742540

25752541
func HasEventsForEpoch(epoch uint64) (bool, error) {

backend/pkg/commons/db2/consensus.go

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package db2
22

33
import (
4+
"context"
45
"database/sql"
56
"fmt"
67
"strings"
@@ -10,6 +11,8 @@ import (
1011
"github.com/gobitfly/beaconchain/pkg/commons/log"
1112
"github.com/gobitfly/beaconchain/pkg/commons/types"
1213
"github.com/gobitfly/beaconchain/pkg/commons/utils"
14+
"github.com/jackc/pgx/v5"
15+
"github.com/jackc/pgx/v5/stdlib"
1316
"github.com/jmoiron/sqlx"
1417
"github.com/pkg/errors"
1518
)
@@ -403,10 +406,81 @@ func (c *ConsensusDB) SavePendingDepositsQueue(pendingDeposits []types.PendingDe
403406
dat[i] = []interface{}{r.ID, r.ValidatorIndex, utils.DBEncodeToHex(r.Pubkey), utils.DBEncodeToHex(r.WithdrawalCredentials), r.Amount, utils.DBEncodeToHex(r.Signature), r.Slot, r.QueuedBalanceAhead, r.EstClearEpoch}
404407
}
405408

406-
err := db.ClearAndCopyToTable(c.WriterDb, "pending_deposits_queue", []string{"id", "validator_index", "pubkey", "withdrawal_credentials", "amount", "signature", "slot", "queued_balance_ahead", "est_clear_epoch"}, dat)
409+
conn, err := c.WriterDb.Conn(context.Background())
410+
if err != nil {
411+
return fmt.Errorf("error retrieving raw sql connection: %w", err)
412+
}
413+
defer conn.Close()
414+
err = conn.Raw(func(driverConn interface{}) error {
415+
conn := driverConn.(*stdlib.Conn).Conn()
416+
tx, err := conn.Begin(context.Background())
417+
if err != nil {
418+
return err
419+
}
420+
421+
defer func() {
422+
err := tx.Rollback(context.Background())
423+
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
424+
log.Error(err, "error rolling back transaction", 0)
425+
}
426+
}()
427+
428+
err = db.ClearAndCopyToTable(tx, "pending_deposits_queue", []string{"id", "validator_index", "pubkey", "withdrawal_credentials", "amount", "signature", "slot", "queued_balance_ahead", "est_clear_epoch"}, dat)
429+
if err != nil {
430+
return errors.Wrap(err, "failed to save pending deposits queue")
431+
}
432+
433+
err = matchDepositRequests(tx)
434+
if err != nil {
435+
return fmt.Errorf("error matching data with blocks_deposit_requests_v2 table: %w", err)
436+
}
437+
438+
return tx.Commit(context.Background())
439+
})
440+
407441
return err
408442
}
409443

444+
func matchDepositRequests(tx pgx.Tx) error {
445+
// matching will be wrong for postponed system-deposits
446+
// but likelihood to ever occur for one pubkey, amount, slot combo is effectively 0
447+
query := `
448+
WITH pdq_ranked AS (
449+
SELECT *, ROW_NUMBER() OVER (
450+
PARTITION BY pubkey, amount, slot ORDER BY id
451+
) AS rn
452+
FROM pending_deposits_queue
453+
),
454+
bdr_ranked AS (
455+
SELECT *, ROW_NUMBER() OVER (
456+
PARTITION BY pubkey, amount, slot_queued ORDER BY index_queued ASC
457+
) AS rn
458+
FROM blocks_deposit_requests_v2
459+
WHERE status = 'queued' OR status = 'postponed'
460+
),
461+
matches AS (
462+
SELECT pdq.id AS pdq_id, bdr.id AS bdr_id
463+
FROM pdq_ranked pdq
464+
JOIN bdr_ranked bdr
465+
ON pdq.pubkey = bdr.pubkey
466+
AND pdq.amount = bdr.amount
467+
AND (
468+
pdq.slot = bdr.slot_queued AND pdq.rn = bdr.rn OR
469+
(pdq.slot = 0 AND bdr.index_queued < 0)
470+
)
471+
)
472+
UPDATE pending_deposits_queue
473+
SET request_id = matches.bdr_id
474+
FROM matches
475+
WHERE pending_deposits_queue.id = matches.pdq_id;`
476+
477+
_, err := tx.Exec(context.Background(), query)
478+
if err != nil && err != sql.ErrNoRows {
479+
return err
480+
}
481+
return nil
482+
}
483+
410484
func (c *ConsensusDB) GetLastIndexedConsensusLayerEventsEpoch() (count int64, err error) {
411485
err = c.WriterDb.Get(&count, "SELECT COALESCE(MAX(epoch), -1) FROM consensus_layer_events_indexer_metadata")
412486
return count, err

backend/pkg/commons/types/queues.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type PendingDeposit struct {
3333
Slot uint64 `db:"slot"`
3434
QueuedBalanceAhead uint64 `db:"queued_balance_ahead"`
3535
EstClearEpoch uint64 `db:"est_clear_epoch"` // approx epoch where validator deposit will be credited on beaconchain and validator getting assigned an index (happens in transition from est_clear_epoch-1 to est_clear_epoch)
36+
RequestId sql.NullInt64 `db:"request_id"` // foreign key to the deposit's CL request
3637
// eligible = est_clear_epoch + 1
3738
// activation = eligible + 2 + MAX_SEED_LOOKAHEAD
3839

0 commit comments

Comments
 (0)