Skip to content

Commit 0645756

Browse files
committed
feat(dash-exporter): use muttex to prevent concurrent rolling gen while transfering data
1 parent 041dd18 commit 0645756

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

backend/pkg/exporter/modules/dashboard_data.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ import (
2727

2828
type dashboardData struct {
2929
ModuleContext
30-
log ModuleLog
31-
signingDomain []byte
32-
phase0HotfixMutex sync.Mutex
33-
latestSafeEpoch atomic.Int64
34-
heavySemaphore *semaphore.Weighted
35-
mediumSemaphore *semaphore.Weighted
36-
lightSemaphore *semaphore.Weighted
30+
log ModuleLog
31+
signingDomain []byte
32+
phase0HotfixMutex sync.Mutex
33+
sharedRollingGenMutex *sync.Mutex
34+
latestSafeEpoch atomic.Int64
35+
heavySemaphore *semaphore.Weighted
36+
mediumSemaphore *semaphore.Weighted
37+
lightSemaphore *semaphore.Weighted
3738
}
3839

3940
func NewDashboardDataModule(moduleContext ModuleContext) ModuleInterface {

backend/pkg/exporter/modules/dashboard_data_maintenance.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func (d *dashboardData) handleIncompleteTransfers() error {
5454
}
5555
d.log.Infof("handleIncompleteTransfers, found %d incomplete transfer epochs", len(incomplete))
5656

57+
d.log.Tracef("acquiring shared rolling gen mutex for incomplete transfers")
58+
d.sharedRollingGenMutex.Lock()
59+
d.log.Tracef("acquired shared rolling gen mutex for incomplete transfers")
60+
defer d.sharedRollingGenMutex.Unlock()
5761
err = d.transferEpochs(incomplete)
5862
if err != nil {
5963
return errors.Wrap(err, "failed to transfer incomplete epochs")
@@ -72,6 +76,11 @@ func (d *dashboardData) handlePendingTransfers() error {
7276
}
7377
d.log.Infof("handlePendingTransfers, found %d pending transfer epochs", len(pending))
7478

79+
d.log.Tracef("acquiring shared rolling gen mutex for pending transfers")
80+
d.sharedRollingGenMutex.Lock()
81+
d.log.Tracef("acquired shared rolling gen mutex for pending transfers")
82+
defer d.sharedRollingGenMutex.Unlock()
83+
7584
// allocate transfer batch ids
7685
batchIds := make([]uuid.UUID, 0)
7786
for i := range pending {

backend/pkg/exporter/modules/dashboard_data_rollings.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@ func (d *dashboardData) handleRollings() error {
3636
edb.Rolling90d,
3737
edb.RollingTotal,
3838
}
39+
d.log.Tracef("acquiring shared rolling gen mutex for rollings")
40+
d.sharedRollingGenMutex.Lock()
41+
d.log.Tracef("acquired shared rolling gen mutex for rollings")
42+
defer d.sharedRollingGenMutex.Unlock()
43+
3944
eg := errgroup.Group{}
40-
//eg.SetLimit(int(utils.Config.DashboardExporter.RollingsInParallel))
45+
eg.SetLimit(int(utils.Config.DashboardExporter.RollingsInParallel))
4146
for _, rolling := range rollings {
4247
rolling := rolling
4348
eg.Go(func() error {

0 commit comments

Comments
 (0)