Skip to content

Commit c69312c

Browse files
committed
refactor: move deposits-aggregation from exporter to statistics and run only once per day
1 parent 4dd7dac commit c69312c

File tree

1 file changed

+53
-0
lines changed

1 file changed

+53
-0
lines changed

backend/cmd/statistics/main.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package statistics
22

33
import (
4+
"database/sql"
45
"errors"
56
"flag"
67
"fmt"
@@ -33,6 +34,7 @@ type options struct {
3334
statisticsValidatorToggle bool
3435
statisticsChartToggle bool
3536
statisticsGraffitiToggle bool
37+
statisticsDepositsToggle bool
3638
resetStatus bool
3739
}
3840

@@ -46,6 +48,7 @@ func Run() {
4648
fs.BoolVar(&opt.statisticsValidatorToggle, "validators.enabled", false, "Toggle exporting validator statistics")
4749
fs.BoolVar(&opt.statisticsChartToggle, "charts.enabled", false, "Toggle exporting chart series")
4850
fs.BoolVar(&opt.statisticsGraffitiToggle, "graffiti.enabled", false, "Toggle exporting graffiti statistics")
51+
fs.BoolVar(&opt.statisticsDepositsToggle, "deposits.enabled", false, "Toggle aggregating deposits")
4952
fs.BoolVar(&opt.resetStatus, "validators.reset", false, "Export stats independent if they have already been exported previously")
5053

5154
versionFlag := fs.Bool("version", false, "Show version and exit")
@@ -349,6 +352,12 @@ func statisticsLoop(client rpc.Client) {
349352
}
350353
}
351354

355+
if opt.statisticsDepositsToggle {
356+
err := aggregateDeposits()
357+
log.Error(err, fmt.Errorf("error aggregating deposits"), 0)
358+
loopError = err
359+
}
360+
352361
if loopError == nil {
353362
services.ReportStatus("statistics", "Running", nil)
354363
} else {
@@ -365,3 +374,47 @@ func clearStatsStatusTable(day uint64) {
365374
log.Fatal(err, "error resetting status for day", 0, log.Fields{"day": day})
366375
}
367376
}
377+
378+
func aggregateDeposits() error {
379+
start := time.Now()
380+
defer func() {
381+
metrics.TaskDuration.WithLabelValues("statistics_aggregate_eth1_deposits").Observe(time.Since(start).Seconds())
382+
}()
383+
_, err := db.WriterDb.Exec(`
384+
INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count)
385+
SELECT
386+
eth1.from_address,
387+
SUM(eth1.amount) as amount,
388+
SUM(eth1.validcount) AS validcount,
389+
SUM(eth1.invalidcount) AS invalidcount,
390+
COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount,
391+
COUNT(v.pubkey) AS totalcount,
392+
COUNT(CASE WHEN v.status = 'active_online' OR v.status = 'active_offline' THEN 1 END) as activecount,
393+
COUNT(CASE WHEN v.status = 'deposited' THEN 1 END) AS pendingcount,
394+
COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count
395+
FROM (
396+
SELECT
397+
from_address,
398+
publickey,
399+
SUM(amount) AS amount,
400+
COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount,
401+
COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount
402+
FROM eth1_deposits
403+
GROUP BY from_address, publickey
404+
) eth1
405+
LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey
406+
GROUP BY eth1.from_address
407+
ON CONFLICT (from_address) DO UPDATE SET
408+
amount = excluded.amount,
409+
validcount = excluded.validcount,
410+
invalidcount = excluded.invalidcount,
411+
slashedcount = excluded.slashedcount,
412+
totalcount = excluded.totalcount,
413+
activecount = excluded.activecount,
414+
pendingcount = excluded.pendingcount,
415+
voluntary_exit_count = excluded.voluntary_exit_count`)
416+
if err != nil && err != sql.ErrNoRows {
417+
return nil
418+
}
419+
return err
420+
}

0 commit comments

Comments
 (0)