Skip to content

Commit c1ceb21

Browse files
committed
use delta strategy to use less data during rewards generation
delta from its persistent tables remove gold_staging remove on conflict move staging data to gold_table; remove filter id and optimize left join with gold table group by don't remove temp tables fix missing earners cutoffdate fix table 5 fix table 4 and 5 remove restrictive filter on table 5 fix fix fix temp file fix table 2 fix table 5 table 5 conservative table 5
1 parent 63cf9c5 commit c1ceb21

19 files changed

+610
-144
lines changed

pkg/pipeline/pipeline.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ func (p *Pipeline) RunForFetchedBlock(ctx context.Context, block *fetcher.Fetche
445445
zap.String("cutoffDate", cutoffDate),
446446
zap.Uint64("blockNumber", blockNumber),
447447
)
448-
accountTree, _, _, err := p.rewardsCalculator.MerkelizeRewardsForSnapshot(rewardsCalculationEnd)
448+
accountTree, _, _, err := p.rewardsCalculator.MerkelizeRewardsForSnapshot(cutoffDate)
449449
if err != nil {
450450
p.Logger.Sugar().Errorw("Failed to merkelize rewards for snapshot date",
451451
zap.String("cutoffDate", cutoffDate), zap.Error(err),

pkg/postgres/migrations/202508221218_migrateRewardsTables/up.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -559,33 +559,10 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error {
559559
END $$;`,
560560
},
561561
NewTableName: "gold_table",
562-
ExistingTablePattern: "^$",
562+
ExistingTablePattern: "gold_[0-9]+_staging_[0-9_]+$",
563563
},
564564
}
565565

566-
// Drop staging tables (no data migration needed)
567-
dropStagingTablesQuery := `
568-
SELECT table_name
569-
FROM information_schema.tables
570-
WHERE table_type='BASE TABLE'
571-
AND table_name ~* 'gold_(11|15)_staging_[0-9_]+$'
572-
AND table_schema = 'public'
573-
`
574-
var stagingTables []string
575-
res := grm.Raw(dropStagingTablesQuery).Scan(&stagingTables)
576-
if res.Error != nil {
577-
return fmt.Errorf("failed to find staging tables: %w", res.Error)
578-
}
579-
580-
for _, table := range stagingTables {
581-
dropQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s", table)
582-
dropRes := grm.Exec(dropQuery)
583-
if dropRes.Error != nil {
584-
fmt.Printf("Failed to drop staging table %s: %s\n", table, dropRes.Error)
585-
return dropRes.Error
586-
}
587-
}
588-
589566
_, err := helpers.WrapTxAndCommit(func(tx *gorm.DB) (interface{}, error) {
590567
for _, sm := range subMigrations {
591568
fmt.Printf("Running migration for table: %s\n", sm.NewTableName)

pkg/rewards/10_goldAvsODRewardAmounts.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package rewards
22

33
import (
4+
"fmt"
5+
6+
"github.com/Layr-Labs/sidecar/internal/config"
47
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
58
"go.uber.org/zap"
69
)
710

811
const _10_goldAvsODRewardAmountsQuery = `
9-
INSERT INTO {{.destTableName}} (reward_hash, snapshot, token, avs, operator, avs_tokens, generated_rewards_snapshot_id)
12+
create table {{.destTableName}} as
1013
1114
-- Step 1: Get the rows where operators have not registered for the AVS or if the AVS does not exist
1215
WITH reward_snapshot_operators AS (
@@ -58,7 +61,6 @@ operator_token_sums AS (
5861
5962
-- Step 4: Output the final table
6063
SELECT *, {{.generatedRewardsSnapshotId}} as generated_rewards_snapshot_id FROM operator_token_sums
61-
ON CONFLICT (reward_hash, avs, operator, snapshot) DO NOTHING
6264
`
6365

6466
func (rc *RewardsCalculator) GenerateGold10AvsODRewardAmountsTable(snapshotDate string, generatedRewardsSnapshotId uint64) error {
@@ -72,10 +74,16 @@ func (rc *RewardsCalculator) GenerateGold10AvsODRewardAmountsTable(snapshotDate
7274
return nil
7375
}
7476

75-
destTableName := rewardsUtils.RewardsTable_10_AvsODRewardAmounts
77+
destTableName := rc.getTempAvsODRewardAmountsTableName(snapshotDate, generatedRewardsSnapshotId)
7678
activeOdRewardsTableName := rc.getTempActiveODRewardsTableName(snapshotDate, generatedRewardsSnapshotId)
7779

78-
rc.logger.Sugar().Infow("Generating Avs OD reward amounts",
80+
// Drop existing temp table
81+
if err := rc.DropTempAvsODRewardAmountsTable(snapshotDate, generatedRewardsSnapshotId); err != nil {
82+
rc.logger.Sugar().Errorw("Failed to drop existing temp avs OD reward amounts table", "error", err)
83+
return err
84+
}
85+
86+
rc.logger.Sugar().Infow("Generating temp Avs OD reward amounts",
7987
zap.String("cutoffDate", snapshotDate),
8088
zap.String("destTableName", destTableName),
8189
)
@@ -92,8 +100,30 @@ func (rc *RewardsCalculator) GenerateGold10AvsODRewardAmountsTable(snapshotDate
92100

93101
res := rc.grm.Exec(query)
94102
if res.Error != nil {
95-
rc.logger.Sugar().Errorw("Failed to create gold_avs_od_reward_amounts", "error", res.Error)
103+
rc.logger.Sugar().Errorw("Failed to create temp avs OD reward amounts", "error", res.Error)
96104
return res.Error
97105
}
98106
return nil
99107
}
108+
109+
// Helper functions for temp table management
110+
func (rc *RewardsCalculator) getTempAvsODRewardAmountsTableName(snapshotDate string, generatedRewardSnapshotId uint64) string {
111+
camelDate := config.KebabToSnakeCase(snapshotDate)
112+
return fmt.Sprintf("tmp_rewards_gold_10_avs_od_reward_amounts_%s_%d", camelDate, generatedRewardSnapshotId)
113+
}
114+
115+
func (rc *RewardsCalculator) DropTempAvsODRewardAmountsTable(snapshotDate string, generatedRewardsSnapshotId uint64) error {
116+
tempTableName := rc.getTempAvsODRewardAmountsTableName(snapshotDate, generatedRewardsSnapshotId)
117+
118+
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tempTableName)
119+
res := rc.grm.Exec(query)
120+
if res.Error != nil {
121+
rc.logger.Sugar().Errorw("Failed to drop temp avs OD reward amounts table", "error", res.Error)
122+
return res.Error
123+
}
124+
rc.logger.Sugar().Infow("Successfully dropped temp avs OD reward amounts table",
125+
zap.String("tempTableName", tempTableName),
126+
zap.Uint64("generatedRewardsSnapshotId", generatedRewardsSnapshotId),
127+
)
128+
return nil
129+
}

pkg/rewards/11_goldActiveODOperatorSetRewards.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,21 @@ active_rewards_updated_end_timestamps AS (
4747
block_date AS reward_submission_date
4848
FROM active_rewards_modified
4949
),
50-
50+
-- Optimized: Get the latest snapshot for each reward hash
51+
reward_progress AS (
52+
SELECT
53+
reward_hash,
54+
MAX(snapshot) as last_snapshot
55+
FROM gold_table
56+
GROUP BY reward_hash
57+
),
5158
-- Step 3: For each reward hash, find the latest snapshot
5259
active_rewards_updated_start_timestamps AS (
5360
SELECT
5461
ap.avs,
5562
ap.operator_set_id,
5663
ap.operator,
57-
COALESCE(MAX(g.snapshot), ap.reward_start_exclusive) AS reward_start_exclusive,
64+
COALESCE(g.last_snapshot, ap.reward_start_exclusive) AS reward_start_exclusive,
5865
ap.reward_end_inclusive,
5966
ap.token,
6067
-- We use floor to ensure we are always underestimating total tokens per day
@@ -66,7 +73,7 @@ active_rewards_updated_start_timestamps AS (
6673
ap.global_end_inclusive,
6774
ap.reward_submission_date
6875
FROM active_rewards_updated_end_timestamps ap
69-
LEFT JOIN gold_table g
76+
LEFT JOIN reward_progress g
7077
ON g.reward_hash = ap.reward_hash
7178
GROUP BY
7279
ap.avs,
@@ -78,10 +85,11 @@ active_rewards_updated_start_timestamps AS (
7885
ap.multiplier,
7986
ap.strategy,
8087
ap.reward_hash,
81-
ap.duration,
88+
ap.duration,
8289
ap.global_end_inclusive,
8390
ap.reward_start_exclusive,
84-
ap.reward_submission_date
91+
ap.reward_submission_date,
92+
g.last_snapshot
8593
),
8694
8795
-- Step 4: Filter out invalid reward ranges
@@ -273,5 +281,6 @@ func (rc *RewardsCalculator) CopyTempActiveODOperatorSetRewardsToActiveODOperato
273281
return res.Error
274282
}
275283

276-
return rc.DropTempActiveODOperatorSetRewardsTable(snapshotDate, generatedRewardsSnapshotId)
284+
// return rc.DropTempActiveODOperatorSetRewardsTable(snapshotDate, generatedRewardsSnapshotId)
285+
return nil
277286
}

pkg/rewards/12_goldOperatorODOperatorSetRewardAmounts.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package rewards
22

33
import (
4+
"fmt"
5+
6+
"github.com/Layr-Labs/sidecar/internal/config"
47
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
58
"go.uber.org/zap"
69
)
710

811
const _12_goldOperatorODOperatorSetRewardAmountsQuery = `
9-
INSERT INTO {{.destTableName}} (reward_hash, snapshot, token, tokens_per_registered_snapshot_decimal, avs, operator_set_id, operator, strategy, multiplier, reward_submission_date, rn, split_pct, operator_tokens, generated_rewards_snapshot_id)
12+
create table {{.destTableName}} as
1013
1114
-- Step 1: Get the rows where operators have registered for the operator set
1215
WITH reward_snapshot_operators AS (
@@ -66,7 +69,6 @@ operator_splits AS (
6669
6770
-- Step 4: Output the final table with operator splits
6871
SELECT *, {{.generatedRewardsSnapshotId}} as generated_rewards_snapshot_id FROM operator_splits
69-
ON CONFLICT (reward_hash, operator_set_id, operator, strategy, snapshot) DO NOTHING
7072
`
7173

7274
func (rc *RewardsCalculator) GenerateGold12OperatorODOperatorSetRewardAmountsTable(snapshotDate string, generatedRewardsSnapshotId uint64) error {
@@ -80,10 +82,16 @@ func (rc *RewardsCalculator) GenerateGold12OperatorODOperatorSetRewardAmountsTab
8082
return nil
8183
}
8284

83-
destTableName := rewardsUtils.RewardsTable_12_OperatorODOperatorSetRewardAmounts
85+
destTableName := rc.getTempOperatorODOperatorSetRewardAmountsTableName(snapshotDate, generatedRewardsSnapshotId)
8486
activeODRewardsTable := rc.getTempActiveODOperatorSetRewardsTableName(snapshotDate, generatedRewardsSnapshotId)
8587

86-
rc.logger.Sugar().Infow("Generating Operator OD operator set reward amounts",
88+
// Drop existing temp table
89+
if err := rc.DropTempOperatorODOperatorSetRewardAmountsTable(snapshotDate, generatedRewardsSnapshotId); err != nil {
90+
rc.logger.Sugar().Errorw("Failed to drop existing temp operator OD operator set reward amounts table", "error", err)
91+
return err
92+
}
93+
94+
rc.logger.Sugar().Infow("Generating temp Operator OD operator set reward amounts",
8795
zap.String("cutoffDate", snapshotDate),
8896
zap.String("destTableName", destTableName),
8997
)
@@ -100,8 +108,30 @@ func (rc *RewardsCalculator) GenerateGold12OperatorODOperatorSetRewardAmountsTab
100108

101109
res := rc.grm.Exec(query)
102110
if res.Error != nil {
103-
rc.logger.Sugar().Errorw("Failed to create gold_operator_od_operator_set_reward_amounts", "error", res.Error)
111+
rc.logger.Sugar().Errorw("Failed to create temp operator OD operator set reward amounts", "error", res.Error)
104112
return res.Error
105113
}
106114
return nil
107115
}
116+
117+
// Helper functions for temp table management
118+
func (rc *RewardsCalculator) getTempOperatorODOperatorSetRewardAmountsTableName(snapshotDate string, generatedRewardSnapshotId uint64) string {
119+
camelDate := config.KebabToSnakeCase(snapshotDate)
120+
return fmt.Sprintf("tmp_rewards_gold_12_operator_od_operator_set_reward_amounts_%s_%d", camelDate, generatedRewardSnapshotId)
121+
}
122+
123+
func (rc *RewardsCalculator) DropTempOperatorODOperatorSetRewardAmountsTable(snapshotDate string, generatedRewardsSnapshotId uint64) error {
124+
tempTableName := rc.getTempOperatorODOperatorSetRewardAmountsTableName(snapshotDate, generatedRewardsSnapshotId)
125+
126+
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tempTableName)
127+
res := rc.grm.Exec(query)
128+
if res.Error != nil {
129+
rc.logger.Sugar().Errorw("Failed to drop temp operator OD operator set reward amounts table", "error", res.Error)
130+
return res.Error
131+
}
132+
rc.logger.Sugar().Infow("Successfully dropped temp operator OD operator set reward amounts table",
133+
zap.String("tempTableName", tempTableName),
134+
zap.Uint64("generatedRewardsSnapshotId", generatedRewardsSnapshotId),
135+
)
136+
return nil
137+
}

pkg/rewards/13_goldStakerODOperatorSetRewardAmounts.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package rewards
22

33
import (
4+
"fmt"
5+
6+
"github.com/Layr-Labs/sidecar/internal/config"
47
"github.com/Layr-Labs/sidecar/pkg/rewardsUtils"
58
"go.uber.org/zap"
69
)
710

811
const _13_goldStakerODOperatorSetRewardAmountsQuery = `
9-
INSERT INTO {{.destTableName}} (reward_hash, snapshot, token, tokens_per_registered_snapshot_decimal, avs, operator_set_id, operator, strategy, multiplier, reward_submission_date, staker_split, staker, shares, staker_weight, rn, total_weight, staker_proportion, staker_tokens, generated_rewards_snapshot_id)
12+
create table {{.destTableName}} as
1013
1114
-- Step 1: Get the rows where operators have registered for the operator set
1215
WITH reward_snapshot_operators AS (
@@ -128,7 +131,6 @@ staker_reward_amounts AS (
128131
)
129132
-- Output the final table
130133
SELECT *, {{.generatedRewardsSnapshotId}} as generated_rewards_snapshot_id FROM staker_reward_amounts
131-
ON CONFLICT (reward_hash, snapshot, operator_set_id, operator, strategy) DO NOTHING
132134
`
133135

134136
func (rc *RewardsCalculator) GenerateGold13StakerODOperatorSetRewardAmountsTable(snapshotDate string, generatedRewardsSnapshotId uint64) error {
@@ -142,10 +144,16 @@ func (rc *RewardsCalculator) GenerateGold13StakerODOperatorSetRewardAmountsTable
142144
return nil
143145
}
144146

145-
destTableName := rewardsUtils.RewardsTable_13_StakerODOperatorSetRewardAmounts
147+
destTableName := rc.getTempStakerODOperatorSetRewardAmountsTableName(snapshotDate, generatedRewardsSnapshotId)
146148
activeODRewardsTable := rc.getTempActiveODOperatorSetRewardsTableName(snapshotDate, generatedRewardsSnapshotId)
147149

148-
rc.logger.Sugar().Infow("Generating Staker OD operator set reward amounts",
150+
// Drop existing temp table
151+
if err := rc.DropTempStakerODOperatorSetRewardAmountsTable(snapshotDate, generatedRewardsSnapshotId); err != nil {
152+
rc.logger.Sugar().Errorw("Failed to drop existing temp staker OD operator set reward amounts table", "error", err)
153+
return err
154+
}
155+
156+
rc.logger.Sugar().Infow("Generating temp Staker OD operator set reward amounts",
149157
zap.String("cutoffDate", snapshotDate),
150158
zap.String("destTableName", destTableName),
151159
)
@@ -162,8 +170,30 @@ func (rc *RewardsCalculator) GenerateGold13StakerODOperatorSetRewardAmountsTable
162170

163171
res := rc.grm.Exec(query)
164172
if res.Error != nil {
165-
rc.logger.Sugar().Errorw("Failed to create gold_staker_od_operator_set_reward_amounts", "error", res.Error)
173+
rc.logger.Sugar().Errorw("Failed to create temp staker OD operator set reward amounts", "error", res.Error)
166174
return res.Error
167175
}
168176
return nil
169177
}
178+
179+
// Helper functions for temp table management
180+
func (rc *RewardsCalculator) getTempStakerODOperatorSetRewardAmountsTableName(snapshotDate string, generatedRewardSnapshotId uint64) string {
181+
camelDate := config.KebabToSnakeCase(snapshotDate)
182+
return fmt.Sprintf("tmp_rewards_gold_13_staker_od_operator_set_reward_amounts_%s_%d", camelDate, generatedRewardSnapshotId)
183+
}
184+
185+
func (rc *RewardsCalculator) DropTempStakerODOperatorSetRewardAmountsTable(snapshotDate string, generatedRewardsSnapshotId uint64) error {
186+
tempTableName := rc.getTempStakerODOperatorSetRewardAmountsTableName(snapshotDate, generatedRewardsSnapshotId)
187+
188+
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tempTableName)
189+
res := rc.grm.Exec(query)
190+
if res.Error != nil {
191+
rc.logger.Sugar().Errorw("Failed to drop temp staker OD operator set reward amounts table", "error", res.Error)
192+
return res.Error
193+
}
194+
rc.logger.Sugar().Infow("Successfully dropped temp staker OD operator set reward amounts table",
195+
zap.String("tempTableName", tempTableName),
196+
zap.Uint64("generatedRewardsSnapshotId", generatedRewardsSnapshotId),
197+
)
198+
return nil
199+
}

0 commit comments

Comments
 (0)