Skip to content

Commit 752953c

Browse files
committed
fix: propagate context through event listener goroutines and delayed queue
All background polling goroutines (5 listener loops across epoch and finalizer packages) now receive ctx and exit on cancellation instead of running forever. DelayedQueueManager.Add takes ctx so timer goroutines also respect cancellation. FetchSigningPolicies and processDelayedQueue no longer use context.Background().
1 parent 8146902 commit 752953c

8 files changed

Lines changed: 79 additions & 50 deletions

File tree

client/epoch/epoch_client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,20 @@ func (c *client) Run(ctx context.Context) error {
142142
var uptimeSignedListener <-chan *system.FlareSystemsManagerUptimeVoteSigned
143143

144144
if c.preregistrationEnabled {
145-
epochStartedListener = c.systemsManagerClient.RewardEpochStartedListener(c.db, rewardEpochTiming)
145+
epochStartedListener = c.systemsManagerClient.RewardEpochStartedListener(ctx, c.db, rewardEpochTiming)
146146
}
147147
if c.registrationEnabled {
148148
logger.Info("Waiting for VotePowerBlockSelected event to start registration")
149-
vpbsListener = c.systemsManagerClient.VotePowerBlockSelectedListener(c.db, rewardEpochTiming)
150-
policyListener = c.relayClient.SigningPolicyInitializedListener(c.db, rewardEpochTiming)
149+
vpbsListener = c.systemsManagerClient.VotePowerBlockSelectedListener(ctx, c.db, rewardEpochTiming)
150+
policyListener = c.relayClient.SigningPolicyInitializedListener(ctx, c.db, rewardEpochTiming)
151151
}
152152
if c.uptimeVotingEnabled {
153153
logger.Info("Waiting for SignUptimeVoteEnabled event to start uptime vote signing")
154-
uptimeEnabledListener = c.systemsManagerClient.SignUptimeVoteEnabledListener(c.db, rewardEpochTiming)
154+
uptimeEnabledListener = c.systemsManagerClient.SignUptimeVoteEnabledListener(ctx, c.db, rewardEpochTiming)
155155
}
156156
if c.rewardsSigningEnabled {
157157
logger.Info("Waiting for UptimeVoteSigned event to start rewards signing")
158-
uptimeSignedListener = c.systemsManagerClient.UptimeVoteSignedListener(c.db, rewardEpochTiming)
158+
uptimeSignedListener = c.systemsManagerClient.UptimeVoteSignedListener(ctx, c.db, rewardEpochTiming)
159159
}
160160

161161
for {

client/epoch/epoch_client_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func (c testSystemsManagerClient) RewardEpochTimingFromChain() (*utils.EpochTimi
291291
}
292292

293293
func (c testSystemsManagerClient) VotePowerBlockSelectedListener(
294-
db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig,
294+
_ context.Context, db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig,
295295
) <-chan *system.FlareSystemsManagerVotePowerBlockSelected {
296296
return c.vpbsChan
297297
}
@@ -341,7 +341,7 @@ func (c testRelayClient) sendTestPolicy(policy *relay.RelaySigningPolicyInitiali
341341
}
342342

343343
func (c testRelayClient) SigningPolicyInitializedListener(
344-
db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig,
344+
_ context.Context, db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig,
345345
) <-chan *relay.RelaySigningPolicyInitialized {
346346
return c.policyChan
347347
}
@@ -381,7 +381,7 @@ func (c testRegistryClient) RegisterVoter(
381381
}, 1, 0)
382382
}
383383

384-
func (c testSystemsManagerClient) RewardEpochStartedListener(db epochClientDB, config *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerRewardEpochStarted {
384+
func (c testSystemsManagerClient) RewardEpochStartedListener(_ context.Context, db epochClientDB, config *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerRewardEpochStarted {
385385
return make(chan *system.FlareSystemsManagerRewardEpochStarted)
386386
}
387387

@@ -391,7 +391,7 @@ func (c testRegistryClient) PreregisterVoter(_ context.Context, nextRewardEpochI
391391
}, 1, 0)
392392
}
393393

394-
func (c testSystemsManagerClient) SignUptimeVoteEnabledListener(db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerSignUptimeVoteEnabled {
394+
func (c testSystemsManagerClient) SignUptimeVoteEnabledListener(_ context.Context, db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerSignUptimeVoteEnabled {
395395
return make(chan *system.FlareSystemsManagerSignUptimeVoteEnabled)
396396
}
397397

@@ -401,7 +401,7 @@ func (c testSystemsManagerClient) SignUptimeVote(_ context.Context, b *big.Int)
401401
}, 1, 0)
402402
}
403403

404-
func (c testSystemsManagerClient) UptimeVoteSignedListener(db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerUptimeVoteSigned {
404+
func (c testSystemsManagerClient) UptimeVoteSignedListener(_ context.Context, db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerUptimeVoteSigned {
405405
return make(chan *system.FlareSystemsManagerUptimeVoteSigned)
406406
}
407407

client/epoch/relay_utils.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
)
1919

2020
type relayContractClient interface {
21-
SigningPolicyInitializedListener(epochClientDB, *utils.EpochTimingConfig) <-chan *relay.RelaySigningPolicyInitialized
21+
SigningPolicyInitializedListener(ctx context.Context, db epochClientDB, config *utils.EpochTimingConfig) <-chan *relay.RelaySigningPolicyInitialized
2222
}
2323

2424
type relayContractClientImpl struct {
@@ -42,7 +42,7 @@ func NewRelayContractClient(
4242
}, nil
4343
}
4444

45-
func (r *relayContractClientImpl) SigningPolicyInitializedListener(db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig) <-chan *relay.RelaySigningPolicyInitialized {
45+
func (r *relayContractClientImpl) SigningPolicyInitializedListener(ctx context.Context, db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig) <-chan *relay.RelaySigningPolicyInitialized {
4646
topic0, err := chain.EventIDFromMetadata(relay.RelayMetaData, "SigningPolicyInitialized")
4747
if err != nil {
4848
// panic, this error is fatal
@@ -55,9 +55,13 @@ func (r *relayContractClientImpl) SigningPolicyInitializedListener(db epochClien
5555
ticker := time.NewTicker(shared.EventListenerInterval)
5656
eventRangeStart := rewardEpochTiming.StartTime(rewardEpochTiming.EpochIndex(time.Now()) - 1).Unix()
5757
for {
58-
<-ticker.C
58+
select {
59+
case <-ctx.Done():
60+
return
61+
case <-ticker.C:
62+
}
5963
now := time.Now().Unix()
60-
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(context.Background(), r.address, topic0, eventRangeStart, now)
64+
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(ctx, r.address, topic0, eventRangeStart, now)
6165
if err != nil {
6266
logger.Errorf("Error fetching logs %v", err)
6367
continue

client/epoch/system_manager_utils.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ func init() {
5252
type systemsManagerContractClient interface {
5353
RewardEpochTimingFromChain() (*utils.EpochTimingConfig, error)
5454

55-
RewardEpochStartedListener(epochClientDB, *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerRewardEpochStarted
55+
RewardEpochStartedListener(ctx context.Context, db epochClientDB, config *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerRewardEpochStarted
5656

57-
VotePowerBlockSelectedListener(epochClientDB, *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerVotePowerBlockSelected
57+
VotePowerBlockSelectedListener(ctx context.Context, db epochClientDB, config *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerVotePowerBlockSelected
5858
SignNewSigningPolicy(ctx context.Context, epochID *big.Int, policy []byte) <-chan shared.ExecuteStatus[any]
5959

60-
SignUptimeVoteEnabledListener(epochClientDB, *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerSignUptimeVoteEnabled
60+
SignUptimeVoteEnabledListener(ctx context.Context, db epochClientDB, config *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerSignUptimeVoteEnabled
6161
SignUptimeVote(ctx context.Context, epochID *big.Int) <-chan shared.ExecuteStatus[any]
6262

63-
UptimeVoteSignedListener(epochClientDB, *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerUptimeVoteSigned
63+
UptimeVoteSignedListener(ctx context.Context, db epochClientDB, config *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerUptimeVoteSigned
6464
SignRewards(ctx context.Context, epochID *big.Int, rewardHash *common.Hash, weightClaims int) <-chan shared.ExecuteStatus[any]
6565
IsRewardHashSigned(*big.Int) bool
6666

@@ -190,7 +190,7 @@ func (s *systemsManagerContractClientImpl) GetCurrentRewardEpochID() <-chan shar
190190
}, shared.MaxTxSendRetries, shared.TxRetryInterval)
191191
}
192192

193-
func (s *systemsManagerContractClientImpl) RewardEpochStartedListener(db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerRewardEpochStarted {
193+
func (s *systemsManagerContractClientImpl) RewardEpochStartedListener(ctx context.Context, db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerRewardEpochStarted {
194194
out := make(chan *system.FlareSystemsManagerRewardEpochStarted)
195195
topic0, err := chain.EventIDFromMetadata(system.FlareSystemsManagerMetaData, "RewardEpochStarted")
196196
if err != nil {
@@ -202,9 +202,13 @@ func (s *systemsManagerContractClientImpl) RewardEpochStartedListener(db epochCl
202202
ticker := time.NewTicker(shared.EventListenerInterval)
203203
eventRangeStart := rewardEpochTiming.StartTime(rewardEpochTiming.EpochIndex(time.Now())).Unix() - 60*60 // Expected epoch start - 1h
204204
for {
205-
<-ticker.C
205+
select {
206+
case <-ctx.Done():
207+
return
208+
case <-ticker.C:
209+
}
206210
now := time.Now().Unix()
207-
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(context.Background(), s.address, topic0, eventRangeStart, now)
211+
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(ctx, s.address, topic0, eventRangeStart, now)
208212
if err != nil {
209213
logger.Errorf("Error fetching logs %v", err)
210214
continue
@@ -231,7 +235,7 @@ func (s *systemsManagerContractClientImpl) parseRewardEpochStartedEvent(dbLog da
231235
return s.flareSystemsManager.ParseRewardEpochStarted(*contractLog)
232236
}
233237

234-
func (s *systemsManagerContractClientImpl) VotePowerBlockSelectedListener(db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerVotePowerBlockSelected {
238+
func (s *systemsManagerContractClientImpl) VotePowerBlockSelectedListener(ctx context.Context, db epochClientDB, rewardEpochTiming *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerVotePowerBlockSelected {
235239
out := make(chan *system.FlareSystemsManagerVotePowerBlockSelected)
236240
topic0, err := chain.EventIDFromMetadata(system.FlareSystemsManagerMetaData, "VotePowerBlockSelected")
237241
if err != nil {
@@ -243,9 +247,13 @@ func (s *systemsManagerContractClientImpl) VotePowerBlockSelectedListener(db epo
243247
ticker := time.NewTicker(shared.EventListenerInterval)
244248
eventRangeStart := rewardEpochTiming.StartTime(rewardEpochTiming.EpochIndex(time.Now()) - 1).Unix()
245249
for {
246-
<-ticker.C
250+
select {
251+
case <-ctx.Done():
252+
return
253+
case <-ticker.C:
254+
}
247255
now := time.Now().Unix()
248-
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(context.Background(), s.address, topic0, eventRangeStart, now)
256+
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(ctx, s.address, topic0, eventRangeStart, now)
249257
if err != nil {
250258
logger.Errorf("Error fetching logs %v", err)
251259
continue
@@ -276,7 +284,7 @@ func (s *systemsManagerContractClientImpl) RewardEpochTimingFromChain() (*utils.
276284
return shared.RewardEpochTimingFromChain(s.flareSystemsManager)
277285
}
278286

279-
func (s *systemsManagerContractClientImpl) SignUptimeVoteEnabledListener(db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerSignUptimeVoteEnabled {
287+
func (s *systemsManagerContractClientImpl) SignUptimeVoteEnabledListener(ctx context.Context, db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerSignUptimeVoteEnabled {
280288
out := make(chan *system.FlareSystemsManagerSignUptimeVoteEnabled)
281289
topic0, err := chain.EventIDFromMetadata(system.FlareSystemsManagerMetaData, "SignUptimeVoteEnabled")
282290
if err != nil {
@@ -289,11 +297,15 @@ func (s *systemsManagerContractClientImpl) SignUptimeVoteEnabledListener(db epoc
289297
startEpoch := epoch.EpochIndex(time.Now())
290298
eventRangeStart := epoch.StartTime(startEpoch).Unix()
291299
for {
292-
<-ticker.C
300+
select {
301+
case <-ctx.Done():
302+
return
303+
case <-ticker.C:
304+
}
293305
now := time.Now()
294306
currentEpoch := epoch.EpochIndex(now)
295307

296-
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(context.Background(), s.address, topic0, eventRangeStart, now.Unix())
308+
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(ctx, s.address, topic0, eventRangeStart, now.Unix())
297309
if err != nil {
298310
logger.Errorf("Error fetching logs %v", err)
299311
continue
@@ -377,7 +389,7 @@ func (s *systemsManagerContractClientImpl) sendSignUptimeVote(ctx context.Contex
377389
return nil
378390
}
379391

380-
func (s *systemsManagerContractClientImpl) UptimeVoteSignedListener(db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerUptimeVoteSigned {
392+
func (s *systemsManagerContractClientImpl) UptimeVoteSignedListener(ctx context.Context, db epochClientDB, epoch *utils.EpochTimingConfig) <-chan *system.FlareSystemsManagerUptimeVoteSigned {
381393
out := make(chan *system.FlareSystemsManagerUptimeVoteSigned)
382394
topic0, err := chain.EventIDFromMetadata(system.FlareSystemsManagerMetaData, "UptimeVoteSigned")
383395
if err != nil {
@@ -390,11 +402,15 @@ func (s *systemsManagerContractClientImpl) UptimeVoteSignedListener(db epochClie
390402
startEpoch := epoch.EpochIndex(time.Now())
391403
eventRangeStart := epoch.StartTime(startEpoch).Unix()
392404
for {
393-
<-ticker.C
405+
select {
406+
case <-ctx.Done():
407+
return
408+
case <-ticker.C:
409+
}
394410
now := time.Now()
395411
currentEpoch := epoch.EpochIndex(now)
396412

397-
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(context.Background(), s.address, topic0, eventRangeStart, now.Unix())
413+
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(ctx, s.address, topic0, eventRangeStart, now.Unix())
398414
if err != nil {
399415
logger.Errorf("Error fetching logs %v", err)
400416
continue

client/finalizer/finalizer_client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ func (c *client) Run(ctx context.Context) error {
124124
}
125125

126126
func (c *client) fetchExistingSigningPolicies(
127-
_ context.Context, startTime time.Time,
127+
ctx context.Context, startTime time.Time,
128128
) (time.Time, error) {
129129
// Read current signing policies from the database and add them to the storage
130-
spList, err := c.relayClient.FetchSigningPolicies(c.db, startTime.Unix(), time.Now().Unix())
130+
spList, err := c.relayClient.FetchSigningPolicies(ctx, c.db, startTime.Unix(), time.Now().Unix())
131131
if err != nil {
132132
return startTime, err
133133
}
@@ -150,7 +150,7 @@ func (c *client) fetchExistingSigningPolicies(
150150
}
151151

152152
func (c *client) runSigningPolicyInitializedListener(ctx context.Context, startTime time.Time) error {
153-
spListener := c.relayClient.SigningPolicyInitializedListener(c.db, startTime)
153+
spListener := c.relayClient.SigningPolicyInitializedListener(ctx, c.db, startTime)
154154
for {
155155
var dbPolicy signingPolicyListenerResponse
156156
select {

client/finalizer/finalizer_queue.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (p *finalizerQueueProcessor) Run(ctx context.Context) error {
147147
logger.Debugf("Finalizer will send now for voting round %v for protocol %v", item.votingRoundID, item.protocolID)
148148
p.processItem(ctx, item, true)
149149
}
150-
p.delayedQueues.Add(st, item)
150+
p.delayedQueues.Add(ctx, st, item)
151151
} else {
152152
logger.Errorf("Finalizer missing finalization data for protocol %v in votingRound %v", item.protocolID, item.votingRoundID)
153153
}
@@ -201,12 +201,12 @@ func (p *finalizerQueueProcessor) processItem(ctx context.Context, item *queueIt
201201
p.relayClient.SubmitPayloads(ctx, txInput, isDelayed, item.protocolID)
202202
}
203203

204-
func (p *finalizerQueueProcessor) processDelayedQueue(items []*queueItem) error {
204+
func (p *finalizerQueueProcessor) processDelayedQueue(ctx context.Context, items []*queueItem) error {
205205
now := time.Now()
206206
currentEpoch := p.finalizerContext.votingRoundTiming.EpochIndex(now)
207207
startTime := p.finalizerContext.votingRoundTiming.StartTime(currentEpoch)
208208

209-
relayedItems, err := p.relayClient.ProtocolMessageRelayed(context.Background(), p.db, startTime, now)
209+
relayedItems, err := p.relayClient.ProtocolMessageRelayed(ctx, p.db, startTime, now)
210210
if err != nil {
211211
return err
212212
}
@@ -216,7 +216,7 @@ func (p *finalizerQueueProcessor) processDelayedQueue(items []*queueItem) error
216216
continue
217217
}
218218
logger.Infof("Finalizer processes delayed queue item for round %v for protocol %v", item.votingRoundID, item.protocolID)
219-
p.processItem(context.TODO(), item, true)
219+
p.processItem(ctx, item, true)
220220
}
221221
return nil
222222
}

client/finalizer/relay_client.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ func NewRelayContractClient(
111111
}
112112

113113
// FetchSigningPolicies fetches signing policies emitted by in SigningPolicyInitialized events from Relay smart contract with timestamps in the interval (from,to].
114-
func (r *relayContractClient) FetchSigningPolicies(db finalizerDB, from, to int64) ([]signingPolicyListenerResponse, error) {
115-
logs, err := db.FetchLogsByAddressAndTopic0(context.Background(), r.address, r.topic0SPI, from, to)
114+
func (r *relayContractClient) FetchSigningPolicies(ctx context.Context, db finalizerDB, from, to int64) ([]signingPolicyListenerResponse, error) {
115+
logs, err := db.FetchLogsByAddressAndTopic0(ctx, r.address, r.topic0SPI, from, to)
116116
if err != nil {
117117
return nil, err
118118
}
@@ -129,16 +129,20 @@ func (r *relayContractClient) FetchSigningPolicies(db finalizerDB, from, to int6
129129
return result, nil
130130
}
131131

132-
func (r *relayContractClient) SigningPolicyInitializedListener(db finalizerDB, startTime time.Time) <-chan signingPolicyListenerResponse {
132+
func (r *relayContractClient) SigningPolicyInitializedListener(ctx context.Context, db finalizerDB, startTime time.Time) <-chan signingPolicyListenerResponse {
133133
out := make(chan signingPolicyListenerResponse, listenerBufferSize)
134134
go func() {
135135
ticker := time.NewTicker(shared.EventListenerInterval)
136136
eventRangeStart := startTime.Unix()
137137
for {
138-
<-ticker.C
138+
select {
139+
case <-ctx.Done():
140+
return
141+
case <-ticker.C:
142+
}
139143
now := time.Now().Unix()
140144

141-
logs, err := db.FetchLogsByAddressAndTopic0(context.Background(), r.address, r.topic0SPI, eventRangeStart, now)
145+
logs, err := db.FetchLogsByAddressAndTopic0(ctx, r.address, r.topic0SPI, eventRangeStart, now)
142146
if err != nil {
143147
logger.Errorf("Error fetching logs %v", err)
144148
continue

utils/queue.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package utils
22

33
import (
4+
"context"
45
"sync"
56
"time"
67

78
"github.com/flare-foundation/go-flare-common/pkg/logger"
89
)
910

10-
type QueueProcessorFunc[T any] func([]T) error
11+
type QueueProcessorFunc[T any] func(context.Context, []T) error
1112

1213
type DelayedQueueManager[T any] struct {
1314
timeMap map[time.Time][]T
@@ -24,7 +25,7 @@ func NewDelayedQueueManager[T any](processor QueueProcessorFunc[T]) *DelayedQueu
2425
}
2526
}
2627

27-
func (l *DelayedQueueManager[T]) Add(t time.Time, item T) {
28+
func (l *DelayedQueueManager[T]) Add(ctx context.Context, t time.Time, item T) {
2829
if t.Before(time.Now()) {
2930
return
3031
}
@@ -33,7 +34,7 @@ func (l *DelayedQueueManager[T]) Add(t time.Time, item T) {
3334
defer l.Unlock()
3435

3536
if _, ok := l.timeMap[t]; !ok {
36-
l.createTimer(t)
37+
l.createTimer(ctx, t)
3738
}
3839
l.timeMap[t] = append(l.timeMap[t], item)
3940
}
@@ -47,13 +48,17 @@ func (l *DelayedQueueManager[T]) Get(t time.Time) []T {
4748
return items
4849
}
4950

50-
func (l *DelayedQueueManager[T]) createTimer(t time.Time) {
51+
func (l *DelayedQueueManager[T]) createTimer(ctx context.Context, t time.Time) {
5152
go func() {
5253
timer := time.NewTimer(time.Until(t))
53-
<-timer.C
54-
items := l.Get(t)
55-
if err := l.processor(items); err != nil {
56-
logger.Errorf("DelayedQueueManager processor error: %s", err)
54+
select {
55+
case <-timer.C:
56+
items := l.Get(t)
57+
if err := l.processor(ctx, items); err != nil {
58+
logger.Errorf("DelayedQueueManager processor error: %s", err)
59+
}
60+
case <-ctx.Done():
61+
l.Get(t) // remove from map
5762
}
5863
}()
5964
}

0 commit comments

Comments
 (0)