Skip to content

Commit 50b8139

Browse files
committed
refactor: move deposits-aggregation from exporter to statistics and run only once per day
1 parent 4dd7dac commit 50b8139

File tree

3 files changed

+78
-53
lines changed

3 files changed

+78
-53
lines changed

backend/cmd/statistics/main.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212
"time"
1313

14+
"github.com/Sirupsen/logrus"
1415
"github.com/gobitfly/beaconchain/pkg/commons/cache"
1516
"github.com/gobitfly/beaconchain/pkg/commons/db"
1617
"github.com/gobitfly/beaconchain/pkg/commons/log"
@@ -33,6 +34,7 @@ type options struct {
3334
statisticsValidatorToggle bool
3435
statisticsChartToggle bool
3536
statisticsGraffitiToggle bool
37+
statisticsDepositsToggle bool
3638
resetStatus bool
3739
}
3840

@@ -46,6 +48,7 @@ func Run() {
4648
fs.BoolVar(&opt.statisticsValidatorToggle, "validators.enabled", false, "Toggle exporting validator statistics")
4749
fs.BoolVar(&opt.statisticsChartToggle, "charts.enabled", false, "Toggle exporting chart series")
4850
fs.BoolVar(&opt.statisticsGraffitiToggle, "graffiti.enabled", false, "Toggle exporting graffiti statistics")
51+
fs.BoolVar(&opt.statisticsDepositsToggle, "deposits.enabled", false, "Toggle aggregating deposits")
4952
fs.BoolVar(&opt.resetStatus, "validators.reset", false, "Export stats independent if they have already been exported previously")
5053

5154
versionFlag := fs.Bool("version", false, "Show version and exit")
@@ -248,6 +251,10 @@ func Run() {
248251

249252
go statisticsLoop(rpcClient)
250253

254+
if opt.statisticsDepositsToggle {
255+
go depositsLoop()
256+
}
257+
251258
utils.WaitForCtrlC()
252259

253260
log.Infof("exiting...")
@@ -287,6 +294,18 @@ func statisticsLoop(client rpc.Client) {
287294
if lastExportedDayValidator != 0 {
288295
lastExportedDayValidator++
289296
}
297+
298+
if lastExportedDayValidator <= previousDay {
299+
start := time.Now()
300+
err = db.AggregateDeposits()
301+
if err != nil {
302+
logrus.Errorf("error aggregating deposits: %v", err)
303+
loopError = err
304+
} else {
305+
logrus.WithFields(logrus.Fields{"duration": time.Since(start)}).Infof("aggregated deposits")
306+
}
307+
}
308+
290309
if lastExportedDayValidator <= previousDay || lastExportedDayValidator == 0 {
291310
for day := lastExportedDayValidator; day <= previousDay; day++ {
292311
err := db.WriteValidatorStatisticsForDay(day, client)
@@ -358,6 +377,21 @@ func statisticsLoop(client rpc.Client) {
358377
}
359378
}
360379

380+
func depositsLoop() {
381+
for {
382+
start := time.Now()
383+
err := db.AggregateDeposits()
384+
if err != nil {
385+
log.Error(err, "error aggregating deposits", 0)
386+
services.ReportStatus("deposits_aggregator", err.Error(), nil)
387+
} else {
388+
log.InfoWithFields(log.Fields{"duration": time.Since(start)}, "aggregated deposits")
389+
services.ReportStatus("deposits_aggregator", "Running", nil)
390+
}
391+
time.Sleep(time.Hour * 24)
392+
}
393+
}
394+
361395
func clearStatsStatusTable(day uint64) {
362396
log.Infof("deleting validator_stats_status for day %v", day)
363397
_, err := db.WriterDb.Exec("DELETE FROM validator_stats_status WHERE day = $1", day)

backend/pkg/commons/db/statistics.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1843,3 +1843,47 @@ func CheckIfDayIsFinalized(day uint64) error {
18431843

18441844
return nil
18451845
}
1846+
1847+
func AggregateDeposits() error {
1848+
start := time.Now()
1849+
defer func() {
1850+
metrics.TaskDuration.WithLabelValues("statistics_aggregate_eth1_deposits").Observe(time.Since(start).Seconds())
1851+
}()
1852+
_, err := WriterDb.Exec(`
1853+
INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count)
1854+
SELECT
1855+
eth1.from_address,
1856+
SUM(eth1.amount) as amount,
1857+
SUM(eth1.validcount) AS validcount,
1858+
SUM(eth1.invalidcount) AS invalidcount,
1859+
COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount,
1860+
COUNT(v.pubkey) AS totalcount,
1861+
COUNT(CASE WHEN v.status = 'active_online' OR v.status = 'active_offline' THEN 1 END) as activecount,
1862+
COUNT(CASE WHEN v.status = 'deposited' THEN 1 END) AS pendingcount,
1863+
COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count
1864+
FROM (
1865+
SELECT
1866+
from_address,
1867+
publickey,
1868+
SUM(amount) AS amount,
1869+
COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount,
1870+
COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount
1871+
FROM eth1_deposits
1872+
GROUP BY from_address, publickey
1873+
) eth1
1874+
LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey
1875+
GROUP BY eth1.from_address
1876+
ON CONFLICT (from_address) DO UPDATE SET
1877+
amount = excluded.amount,
1878+
validcount = excluded.validcount,
1879+
invalidcount = excluded.invalidcount,
1880+
slashedcount = excluded.slashedcount,
1881+
totalcount = excluded.totalcount,
1882+
activecount = excluded.activecount,
1883+
pendingcount = excluded.pendingcount,
1884+
voluntary_exit_count = excluded.voluntary_exit_count`)
1885+
if err != nil && err != sql.ErrNoRows {
1886+
return err
1887+
}
1888+
return nil
1889+
}

backend/pkg/exporter/modules/execution_deposits_exporter.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract"
2727
"github.com/gobitfly/beaconchain/pkg/commons/db"
2828
"github.com/gobitfly/beaconchain/pkg/commons/log"
29-
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
3029
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
3130
"github.com/gobitfly/beaconchain/pkg/commons/services"
3231
"github.com/gobitfly/beaconchain/pkg/commons/types"
@@ -282,13 +281,6 @@ func (d *executionDepositsExporter) export() (err error) {
282281

283282
log.Debugf("updating cached deposits view took %v", time.Since(start))
284283

285-
if len(depositsToSave) > 0 {
286-
err = d.aggregateDeposits()
287-
if err != nil {
288-
return err
289-
}
290-
}
291-
292284
return nil
293285
}
294286

@@ -676,51 +668,6 @@ func (d *executionDepositsExporter) getDepositTraces(txsToTrace []string) (filte
676668
return filteredTraces, nil
677669
}
678670

679-
func (d *executionDepositsExporter) aggregateDeposits() error {
680-
/// this could be a materialized view
681-
start := time.Now()
682-
defer func() {
683-
metrics.TaskDuration.WithLabelValues("exporter_aggregate_eth1_deposits").Observe(time.Since(start).Seconds())
684-
}()
685-
_, err := db.WriterDb.Exec(`
686-
INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count)
687-
SELECT
688-
eth1.from_address,
689-
SUM(eth1.amount) as amount,
690-
SUM(eth1.validcount) AS validcount,
691-
SUM(eth1.invalidcount) AS invalidcount,
692-
COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount,
693-
COUNT(v.pubkey) AS totalcount,
694-
COUNT(CASE WHEN v.status = 'active_online' OR v.status = 'active_offline' THEN 1 END) as activecount,
695-
COUNT(CASE WHEN v.status = 'deposited' THEN 1 END) AS pendingcount,
696-
COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count
697-
FROM (
698-
SELECT
699-
from_address,
700-
publickey,
701-
SUM(amount) AS amount,
702-
COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount,
703-
COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount
704-
FROM eth1_deposits
705-
GROUP BY from_address, publickey
706-
) eth1
707-
LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey
708-
GROUP BY eth1.from_address
709-
ON CONFLICT (from_address) DO UPDATE SET
710-
amount = excluded.amount,
711-
validcount = excluded.validcount,
712-
invalidcount = excluded.invalidcount,
713-
slashedcount = excluded.slashedcount,
714-
totalcount = excluded.totalcount,
715-
activecount = excluded.activecount,
716-
pendingcount = excluded.pendingcount,
717-
voluntary_exit_count = excluded.voluntary_exit_count`)
718-
if err != nil && err != sql.ErrNoRows {
719-
return nil
720-
}
721-
return err
722-
}
723-
724671
func (d *executionDepositsExporter) updateCachedView() error {
725672
err := db.CacheQuery(`
726673
SELECT

0 commit comments

Comments
 (0)