Skip to content

Commit b369b52

Browse files
committed
refactor: move deposits-aggregation from exporter to statistics and run only once per day
1 parent 4dd7dac commit b369b52

File tree

4 files changed

+88
-53
lines changed

4 files changed

+88
-53
lines changed

backend/cmd/statistics/main.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package statistics
22

33
import (
4+
"database/sql"
45
"errors"
56
"flag"
67
"fmt"
@@ -33,6 +34,7 @@ type options struct {
3334
statisticsValidatorToggle bool
3435
statisticsChartToggle bool
3536
statisticsGraffitiToggle bool
37+
statisticsDepositsToggle bool
3638
resetStatus bool
3739
}
3840

@@ -46,6 +48,7 @@ func Run() {
4648
fs.BoolVar(&opt.statisticsValidatorToggle, "validators.enabled", false, "Toggle exporting validator statistics")
4749
fs.BoolVar(&opt.statisticsChartToggle, "charts.enabled", false, "Toggle exporting chart series")
4850
fs.BoolVar(&opt.statisticsGraffitiToggle, "graffiti.enabled", false, "Toggle exporting graffiti statistics")
51+
fs.BoolVar(&opt.statisticsDepositsToggle, "deposits.enabled", false, "Toggle aggregating deposits")
4952
fs.BoolVar(&opt.resetStatus, "validators.reset", false, "Export stats independent if they have already been exported previously")
5053

5154
versionFlag := fs.Bool("version", false, "Show version and exit")
@@ -349,6 +352,26 @@ func statisticsLoop(client rpc.Client) {
349352
}
350353
}
351354

355+
if opt.statisticsDepositsToggle {
356+
lastDepositsAggregationTime := sql.NullTime{}
357+
err := db.WriterDb.Get(&lastDepositsAggregationTime, "select time from eth1_deposits_aggregated_status order by time desc limit 1")
358+
if err != nil && err != sql.ErrNoRows {
359+
log.Error(err, fmt.Errorf("error aggregating deposits: getting lastDepositsAggregationTime: %w", err), 0)
360+
}
361+
if !lastDepositsAggregationTime.Valid || time.Since(lastDepositsAggregationTime.Time) > time.Hour {
362+
start := time.Now()
363+
err := db.AggregateDeposits()
364+
if err != nil {
365+
log.Error(err, fmt.Errorf("error aggregating deposits: %w", err), 0)
366+
loopError = err
367+
}
368+
log.InfoWithFields(log.Fields{
369+
"lastDepositsAggregationTime": lastDepositsAggregationTime.Time,
370+
"duration": time.Since(start),
371+
}, "aggregated deposits")
372+
}
373+
}
374+
352375
if loopError == nil {
353376
services.ReportStatus("statistics", "Running", nil)
354377
} else {
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
4+
CREATE TABLE IF NOT EXISTS eth1_deposits_aggregated_status (
5+
id SERIAL NOT NULL UNIQUE,
6+
time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
7+
PRIMARY KEY (id)
8+
);
9+
10+
-- +goose StatementEnd
11+
12+
-- +goose Down
13+
-- +goose StatementBegin
14+
15+
DROP TABLE IF EXISTS eth1_deposits_aggregated_status;
16+
17+
-- +goose StatementEnd

backend/pkg/commons/db/statistics.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1843,3 +1843,51 @@ func CheckIfDayIsFinalized(day uint64) error {
18431843

18441844
return nil
18451845
}
1846+
1847+
func AggregateDeposits() error {
1848+
start := time.Now()
1849+
defer func() {
1850+
metrics.TaskDuration.WithLabelValues("statistics_aggregate_eth1_deposits").Observe(time.Since(start).Seconds())
1851+
}()
1852+
_, err := WriterDb.Exec(`
1853+
INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count)
1854+
SELECT
1855+
eth1.from_address,
1856+
SUM(eth1.amount) as amount,
1857+
SUM(eth1.validcount) AS validcount,
1858+
SUM(eth1.invalidcount) AS invalidcount,
1859+
COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount,
1860+
COUNT(v.pubkey) AS totalcount,
1861+
COUNT(CASE WHEN v.status = 'active_online' OR v.status = 'active_offline' THEN 1 END) as activecount,
1862+
COUNT(CASE WHEN v.status = 'deposited' THEN 1 END) AS pendingcount,
1863+
COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count
1864+
FROM (
1865+
SELECT
1866+
from_address,
1867+
publickey,
1868+
SUM(amount) AS amount,
1869+
COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount,
1870+
COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount
1871+
FROM eth1_deposits
1872+
GROUP BY from_address, publickey
1873+
) eth1
1874+
LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey
1875+
GROUP BY eth1.from_address
1876+
ON CONFLICT (from_address) DO UPDATE SET
1877+
amount = excluded.amount,
1878+
validcount = excluded.validcount,
1879+
invalidcount = excluded.invalidcount,
1880+
slashedcount = excluded.slashedcount,
1881+
totalcount = excluded.totalcount,
1882+
activecount = excluded.activecount,
1883+
pendingcount = excluded.pendingcount,
1884+
voluntary_exit_count = excluded.voluntary_exit_count`)
1885+
if err != nil && err != sql.ErrNoRows {
1886+
return err
1887+
}
1888+
_, err = WriterDb.Exec(`INSERT INTO eth1_deposits_aggregated_status (id, time) VALUES (1, NOW())`)
1889+
if err != nil {
1890+
return err
1891+
}
1892+
return nil
1893+
}

backend/pkg/exporter/modules/execution_deposits_exporter.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract"
2727
"github.com/gobitfly/beaconchain/pkg/commons/db"
2828
"github.com/gobitfly/beaconchain/pkg/commons/log"
29-
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
3029
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
3130
"github.com/gobitfly/beaconchain/pkg/commons/services"
3231
"github.com/gobitfly/beaconchain/pkg/commons/types"
@@ -282,13 +281,6 @@ func (d *executionDepositsExporter) export() (err error) {
282281

283282
log.Debugf("updating cached deposits view took %v", time.Since(start))
284283

285-
if len(depositsToSave) > 0 {
286-
err = d.aggregateDeposits()
287-
if err != nil {
288-
return err
289-
}
290-
}
291-
292284
return nil
293285
}
294286

@@ -676,51 +668,6 @@ func (d *executionDepositsExporter) getDepositTraces(txsToTrace []string) (filte
676668
return filteredTraces, nil
677669
}
678670

679-
func (d *executionDepositsExporter) aggregateDeposits() error {
680-
/// this could be a materialized view
681-
start := time.Now()
682-
defer func() {
683-
metrics.TaskDuration.WithLabelValues("exporter_aggregate_eth1_deposits").Observe(time.Since(start).Seconds())
684-
}()
685-
_, err := db.WriterDb.Exec(`
686-
INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count)
687-
SELECT
688-
eth1.from_address,
689-
SUM(eth1.amount) as amount,
690-
SUM(eth1.validcount) AS validcount,
691-
SUM(eth1.invalidcount) AS invalidcount,
692-
COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount,
693-
COUNT(v.pubkey) AS totalcount,
694-
COUNT(CASE WHEN v.status = 'active_online' OR v.status = 'active_offline' THEN 1 END) as activecount,
695-
COUNT(CASE WHEN v.status = 'deposited' THEN 1 END) AS pendingcount,
696-
COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count
697-
FROM (
698-
SELECT
699-
from_address,
700-
publickey,
701-
SUM(amount) AS amount,
702-
COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount,
703-
COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount
704-
FROM eth1_deposits
705-
GROUP BY from_address, publickey
706-
) eth1
707-
LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey
708-
GROUP BY eth1.from_address
709-
ON CONFLICT (from_address) DO UPDATE SET
710-
amount = excluded.amount,
711-
validcount = excluded.validcount,
712-
invalidcount = excluded.invalidcount,
713-
slashedcount = excluded.slashedcount,
714-
totalcount = excluded.totalcount,
715-
activecount = excluded.activecount,
716-
pendingcount = excluded.pendingcount,
717-
voluntary_exit_count = excluded.voluntary_exit_count`)
718-
if err != nil && err != sql.ErrNoRows {
719-
return nil
720-
}
721-
return err
722-
}
723-
724671
func (d *executionDepositsExporter) updateCachedView() error {
725672
err := db.CacheQuery(`
726673
SELECT

0 commit comments

Comments
 (0)