Skip to content

Commit 569e5b3

Browse files
authored
refactor: migrate dunning worker to WorkerLifecycle (#916)
* refactor: migrate dunning worker to WorkerLifecycle and redislock Replace hand-rolled lifecycle management (done channel, WaitGroup, mutex, running flag) in DunningWorker with shared scheduler.WorkerLifecycle. Replace inline WaitGroup tracking with lifecycle.ExecuteGuarded for in-flight work during graceful shutdown. Add per-item distributed locking via redislock.Lock to prevent duplicate processing of the same billing run across replicas. Add ShutdownTimeout to DunningWorkerConfig for configurable graceful shutdown (default 30s). Add comprehensive test suite covering validation, start/stop lifecycle, schedule-and-process flows, cancel retry, per-item locking, graceful shutdown, invalid ZSET members, and transient error retention. * fix: address CodeRabbit review feedback Move lock.ReleaseAll after lifecycle.Stop to prevent releasing distributed locks while in-flight work is still running. Use atomic.Bool for callbackCalled in test to prevent data race between worker goroutine and test assertion. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 671205e commit 569e5b3

2 files changed

Lines changed: 667 additions & 49 deletions

File tree

services/payment-order/worker/dunning_worker.go

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import (
66
"fmt"
77
"log/slog"
88
"strconv"
9-
"sync"
109
"time"
1110

1211
"github.com/google/uuid"
1312
"github.com/meridianhub/meridian/services/payment-order/adapters/persistence"
1413
"github.com/meridianhub/meridian/services/payment-order/domain"
14+
"github.com/meridianhub/meridian/shared/platform/redislock"
15+
"github.com/meridianhub/meridian/shared/platform/scheduler"
1516
"github.com/redis/go-redis/v9"
1617
)
1718

@@ -30,6 +31,9 @@ type DunningWorkerConfig struct {
3031
PollInterval time.Duration
3132
// MaxDunningLevel is the level at which accounts should be frozen.
3233
MaxDunningLevel int
34+
// ShutdownTimeout is the maximum time to wait for in-flight work during shutdown.
35+
// Default: 30 seconds.
36+
ShutdownTimeout time.Duration
3337
}
3438

3539
// DunningCallback is called when a dunning escalation is due.
@@ -39,18 +43,19 @@ type DunningCallback func(ctx context.Context, run *domain.BillingRun) error
3943
// DunningWorker polls a Redis sorted set for due dunning retries and triggers
4044
// escalation. It uses ZADD to schedule retries with the due timestamp as score
4145
// and ZRANGEBYSCORE to find items whose due time has passed.
46+
//
47+
// Lifecycle management is delegated to scheduler.WorkerLifecycle.
48+
// Per-item distributed locking uses redislock.Lock to prevent duplicate
49+
// processing across replicas.
4250
type DunningWorker struct {
43-
repo persistence.BillingRepository
44-
redis *redis.Client
45-
config DunningWorkerConfig
46-
callback DunningCallback
47-
logger *slog.Logger
48-
metrics *BillingMetrics
49-
50-
done chan struct{}
51-
wg sync.WaitGroup
52-
mu sync.Mutex
53-
running bool
51+
lifecycle *scheduler.WorkerLifecycle
52+
repo persistence.BillingRepository
53+
redis *redis.Client
54+
lock *redislock.Lock
55+
config DunningWorkerConfig
56+
callback DunningCallback
57+
logger *slog.Logger
58+
metrics *BillingMetrics
5459
}
5560

5661
// NewDunningWorker creates a new dunning retry worker.
@@ -80,72 +85,66 @@ func NewDunningWorker(
8085
if config.MaxDunningLevel == 0 {
8186
config.MaxDunningLevel = domain.MaxDunningLevel
8287
}
88+
if config.ShutdownTimeout == 0 {
89+
config.ShutdownTimeout = 30 * time.Second
90+
}
8391
if metrics == nil {
8492
metrics = NewBillingMetrics()
8593
}
8694

95+
workerLogger := logger.With("component", "dunning_worker")
96+
8797
return &DunningWorker{
88-
repo: repo,
89-
redis: redisClient,
98+
lifecycle: scheduler.NewWorkerLifecycle(workerLogger),
99+
repo: repo,
100+
redis: redisClient,
101+
lock: redislock.NewLock(redisClient, redislock.Config{
102+
KeyPrefix: "dunning:lock",
103+
LockTTL: 30 * time.Second,
104+
RenewEvery: 10 * time.Second,
105+
}, workerLogger),
90106
config: config,
91107
callback: callback,
92-
logger: logger.With("component", "dunning_worker"),
108+
logger: workerLogger,
93109
metrics: metrics,
94-
done: make(chan struct{}),
95110
}, nil
96111
}
97112

98113
// Start begins the dunning worker polling loop.
99114
func (w *DunningWorker) Start(ctx context.Context) error {
100-
w.mu.Lock()
101-
if w.running {
102-
w.mu.Unlock()
103-
return ErrSchedulerRunning
104-
}
105-
w.running = true
106-
w.mu.Unlock()
107-
108115
w.logger.Info("dunning worker starting",
109116
"poll_interval", w.config.PollInterval,
110117
"max_dunning_level", w.config.MaxDunningLevel)
111118

119+
return w.lifecycle.Start(ctx, func(ctx context.Context) error {
120+
return w.pollLoop(ctx)
121+
})
122+
}
123+
124+
// Stop signals the dunning worker to shut down gracefully and waits for
125+
// in-flight work to complete up to the configured shutdown timeout.
126+
func (w *DunningWorker) Stop() {
127+
w.lifecycle.Stop(w.config.ShutdownTimeout)
128+
w.lock.ReleaseAll(context.Background())
129+
}
130+
131+
// pollLoop runs the ticker-based polling loop. It blocks until the context
132+
// is cancelled (via lifecycle.Stop).
133+
func (w *DunningWorker) pollLoop(ctx context.Context) error {
112134
ticker := time.NewTicker(w.config.PollInterval)
113135
defer ticker.Stop()
114136

115137
for {
116138
select {
117139
case <-ctx.Done():
118140
w.logger.Info("dunning worker stopping: context cancelled")
119-
w.mu.Lock()
120-
w.running = false
121-
w.mu.Unlock()
122-
w.wg.Wait()
123-
return nil
124-
case <-w.done:
125-
w.logger.Info("dunning worker stopping: explicit shutdown")
126-
w.mu.Lock()
127-
w.running = false
128-
w.mu.Unlock()
129-
w.wg.Wait()
130141
return nil
131142
case <-ticker.C:
132143
w.processDueRetries(ctx)
133144
}
134145
}
135146
}
136147

137-
// Stop signals the dunning worker to shut down gracefully.
138-
func (w *DunningWorker) Stop() {
139-
w.mu.Lock()
140-
defer w.mu.Unlock()
141-
142-
select {
143-
case <-w.done:
144-
default:
145-
close(w.done)
146-
}
147-
}
148-
149148
// ScheduleDunningRetry adds a billing run to the sorted set with a score
150149
// equal to the Unix timestamp when the retry becomes due.
151150
func (w *DunningWorker) ScheduleDunningRetry(ctx context.Context, billingRunID uuid.UUID, delay time.Duration) error {
@@ -214,10 +213,35 @@ func (w *DunningWorker) processDueRetries(ctx context.Context) {
214213
// was handled (success or permanently resolved) and the ZSET member should be
215214
// removed. Returns false on transient errors so the member is retained for the
216215
// next poll cycle.
216+
//
217+
// Uses redislock.Lock for per-item locking to prevent duplicate processing
218+
// across replicas. Uses lifecycle.ExecuteGuarded to track in-flight work
219+
// for graceful shutdown.
217220
func (w *DunningWorker) processRetry(ctx context.Context, billingRunID uuid.UUID) bool {
218-
w.wg.Add(1)
219-
defer w.wg.Done()
221+
// Acquire a per-item lock to prevent duplicate processing across replicas.
222+
acquired, release, err := w.lock.Acquire(ctx, "dunning", billingRunID.String())
223+
if err != nil {
224+
w.logger.Error("failed to acquire dunning lock",
225+
"billing_run_id", billingRunID,
226+
"error", err)
227+
return false
228+
}
229+
if !acquired {
230+
w.logger.Debug("dunning retry already being processed by another replica",
231+
"billing_run_id", billingRunID)
232+
return false
233+
}
234+
defer release()
235+
236+
var result bool
237+
w.lifecycle.ExecuteGuarded(func() {
238+
result = w.executeRetry(ctx, billingRunID)
239+
})
240+
return result
241+
}
220242

243+
// executeRetry performs the actual retry logic for a single billing run.
244+
func (w *DunningWorker) executeRetry(ctx context.Context, billingRunID uuid.UUID) bool {
221245
// Load billing run from database
222246
run, err := w.repo.FindBillingRunByID(ctx, billingRunID)
223247
if err != nil {

0 commit comments

Comments
 (0)