Skip to content

Commit 24fc2b3

Browse files
authored
fix: scope dunning retry ZSET key by tenant ID (#920)
1 parent b7ec18d commit 24fc2b3

6 files changed

Lines changed: 369 additions & 68 deletions

File tree

services/payment-order/adapters/http/stripe_event_processor.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
var (
1515
ErrNilRedisClient = errors.New("redis client cannot be nil")
1616
ErrEventAlreadyProcessed = errors.New("stripe event already processed")
17+
ErrDunningMissingTenant = errors.New("tenant ID is required for dunning scheduling")
1718
)
1819

1920
// Stripe event processor constants.
@@ -24,9 +25,10 @@ const (
2425
// processedWebhookTTL is how long processed event IDs are retained in Redis (48 hours).
2526
processedWebhookTTL = 48 * time.Hour
2627

27-
// dunningRetryZSet is the Redis sorted set key for dunning retry scheduling.
28-
// This matches the key used by DunningWorker.
29-
dunningRetryZSet = "dunning:retries"
28+
// dunningRetryZSetPrefix is the Redis sorted set key prefix for dunning retry scheduling.
29+
// The full key is "dunning:retries:{tenantID}" for tenant isolation.
30+
// This matches the key pattern used by DunningWorker.
31+
dunningRetryZSetPrefix = "dunning:retries:"
3032

3133
// defaultDunningDelay is the default delay before the first dunning retry.
3234
defaultDunningDelay = 24 * time.Hour
@@ -99,31 +101,39 @@ func (p *StripeEventProcessor) PreProcess(ctx context.Context, eventID string) e
99101
return nil
100102
}
101103

102-
// ScheduleDunning adds a payment order to the dunning retry sorted set.
104+
// ScheduleDunning adds a payment order to the tenant-scoped dunning retry sorted set.
103105
// Called when a payment_intent.payment_failed event is received.
104106
// The dunning worker will pick up the entry and trigger escalation.
105-
func (p *StripeEventProcessor) ScheduleDunning(ctx context.Context, paymentOrderID string) error {
107+
func (p *StripeEventProcessor) ScheduleDunning(ctx context.Context, tenantID, paymentOrderID string) error {
106108
if paymentOrderID == "" {
107109
p.logger.Warn("cannot schedule dunning: empty payment order ID")
108110
return nil
109111
}
112+
if tenantID == "" {
113+
p.logger.Error("cannot schedule dunning: empty tenant ID",
114+
"payment_order_id", paymentOrderID)
115+
return ErrDunningMissingTenant
116+
}
110117

111118
dueAt := time.Now().Add(defaultDunningDelay)
112119
member := redis.Z{
113120
Score: float64(dueAt.Unix()),
114121
Member: fmt.Sprintf("stripe:%s", paymentOrderID),
115122
}
116123

117-
err := p.redis.ZAdd(ctx, dunningRetryZSet, member).Err()
124+
key := dunningRetryZSetPrefix + tenantID
125+
err := p.redis.ZAdd(ctx, key, member).Err()
118126
if err != nil {
119127
p.logger.Error("failed to schedule dunning retry",
120128
"payment_order_id", paymentOrderID,
129+
"tenant_id", tenantID,
121130
"error", err)
122131
return fmt.Errorf("failed to schedule dunning retry: %w", err)
123132
}
124133

125134
p.logger.Info("dunning retry scheduled for failed stripe payment",
126135
"payment_order_id", paymentOrderID,
136+
"tenant_id", tenantID,
127137
"due_at", dueAt)
128138

129139
return nil

services/payment-order/adapters/http/stripe_event_processor_test.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,17 @@ func TestStripeEventProcessor_PreProcess(t *testing.T) {
120120

121121
func TestStripeEventProcessor_ScheduleDunning(t *testing.T) {
122122
ctx := context.Background()
123+
const testTenantID = "test_tenant"
124+
zsetKey := dunningRetryZSetPrefix + testTenantID
123125

124126
t.Run("schedules dunning for payment order", func(t *testing.T) {
125127
proc, client, _ := setupTestEventProcessor(t)
126128

127-
err := proc.ScheduleDunning(ctx, "po-123")
129+
err := proc.ScheduleDunning(ctx, testTenantID, "po-123")
128130
assert.NoError(t, err)
129131

130-
// Verify the entry was added to the ZSET
131-
members, err := client.ZRangeByScore(ctx, dunningRetryZSet, &redis.ZRangeBy{
132+
// Verify the entry was added to the tenant-scoped ZSET
133+
members, err := client.ZRangeByScore(ctx, zsetKey, &redis.ZRangeBy{
132134
Min: "-inf",
133135
Max: "+inf",
134136
}).Result()
@@ -140,25 +142,32 @@ func TestStripeEventProcessor_ScheduleDunning(t *testing.T) {
140142
t.Run("empty payment order ID is a no-op", func(t *testing.T) {
141143
proc, client, _ := setupTestEventProcessor(t)
142144

143-
err := proc.ScheduleDunning(ctx, "")
145+
err := proc.ScheduleDunning(ctx, testTenantID, "")
144146
assert.NoError(t, err)
145147

146148
// Verify nothing was added to the ZSET
147-
count, err := client.ZCard(ctx, dunningRetryZSet).Result()
149+
count, err := client.ZCard(ctx, zsetKey).Result()
148150
require.NoError(t, err)
149151
assert.Equal(t, int64(0), count)
150152
})
151153

154+
t.Run("empty tenant ID returns error", func(t *testing.T) {
155+
proc, _, _ := setupTestEventProcessor(t)
156+
157+
err := proc.ScheduleDunning(ctx, "", "po-123")
158+
assert.ErrorIs(t, err, ErrDunningMissingTenant)
159+
})
160+
152161
t.Run("multiple dunning entries are independent", func(t *testing.T) {
153162
proc, client, _ := setupTestEventProcessor(t)
154163

155-
err := proc.ScheduleDunning(ctx, "po-aaa")
164+
err := proc.ScheduleDunning(ctx, testTenantID, "po-aaa")
156165
assert.NoError(t, err)
157166

158-
err = proc.ScheduleDunning(ctx, "po-bbb")
167+
err = proc.ScheduleDunning(ctx, testTenantID, "po-bbb")
159168
assert.NoError(t, err)
160169

161-
count, err := client.ZCard(ctx, dunningRetryZSet).Result()
170+
count, err := client.ZCard(ctx, zsetKey).Result()
162171
require.NoError(t, err)
163172
assert.Equal(t, int64(2), count)
164173
})
@@ -168,7 +177,39 @@ func TestStripeEventProcessor_ScheduleDunning(t *testing.T) {
168177

169178
mr.Close()
170179

171-
err := proc.ScheduleDunning(ctx, "po-fail")
180+
err := proc.ScheduleDunning(ctx, testTenantID, "po-fail")
172181
assert.Error(t, err)
173182
})
183+
184+
t.Run("tenant isolation between tenants", func(t *testing.T) {
185+
proc, client, _ := setupTestEventProcessor(t)
186+
187+
const tenantA = "tenant_a"
188+
const tenantB = "tenant_b"
189+
190+
err := proc.ScheduleDunning(ctx, tenantA, "po-a1")
191+
assert.NoError(t, err)
192+
err = proc.ScheduleDunning(ctx, tenantB, "po-b1")
193+
assert.NoError(t, err)
194+
195+
// Verify tenant A's key only has tenant A's entry
196+
keyA := dunningRetryZSetPrefix + tenantA
197+
membersA, err := client.ZRangeByScore(ctx, keyA, &redis.ZRangeBy{
198+
Min: "-inf",
199+
Max: "+inf",
200+
}).Result()
201+
require.NoError(t, err)
202+
assert.Len(t, membersA, 1)
203+
assert.Equal(t, "stripe:po-a1", membersA[0])
204+
205+
// Verify tenant B's key only has tenant B's entry
206+
keyB := dunningRetryZSetPrefix + tenantB
207+
membersB, err := client.ZRangeByScore(ctx, keyB, &redis.ZRangeBy{
208+
Min: "-inf",
209+
Max: "+inf",
210+
}).Result()
211+
require.NoError(t, err)
212+
assert.Len(t, membersB, 1)
213+
assert.Equal(t, "stripe:po-b1", membersB[0])
214+
})
174215
}

services/payment-order/adapters/http/stripe_webhook_handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,10 @@ func (h *StripeWebhookHandler) HandleStripeWebhook(w http.ResponseWriter, r *htt
186186

187187
// Schedule dunning for failed payments (fire-and-forget, does not affect response)
188188
if h.eventProcessor != nil && parsed.Status == "REJECTED" && parsed.PaymentOrderID != "" {
189-
if dunningErr := h.eventProcessor.ScheduleDunning(ctx, parsed.PaymentOrderID); dunningErr != nil {
189+
if dunningErr := h.eventProcessor.ScheduleDunning(ctx, tenantID.String(), parsed.PaymentOrderID); dunningErr != nil {
190190
h.logger.Error("failed to schedule dunning for failed payment",
191191
"payment_order_id", parsed.PaymentOrderID,
192+
"tenant_id", tenantID.String(),
192193
"event_id", parsed.EventID,
193194
"error", dunningErr)
194195
}

services/payment-order/adapters/http/stripe_webhook_handler_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ func TestStripeWebhookHandler_FailedPaymentTriggersDunning(t *testing.T) {
408408
assert.Equal(t, http.StatusOK, rr.Code)
409409

410410
// Verify dunning was scheduled in the ZSET
411-
members, err := redisClient.ZRangeByScore(ctx, dunningRetryZSet, &redis.ZRangeBy{
411+
members, err := redisClient.ZRangeByScore(ctx, dunningRetryZSetPrefix+"test-tenant", &redis.ZRangeBy{
412412
Min: "-inf",
413413
Max: "+inf",
414414
}).Result()
@@ -441,7 +441,7 @@ func TestStripeWebhookHandler_SucceededPaymentDoesNotTriggerDunning(t *testing.T
441441
assert.Equal(t, http.StatusOK, rr.Code)
442442

443443
// Verify dunning was NOT scheduled
444-
count, err := redisClient.ZCard(ctx, dunningRetryZSet).Result()
444+
count, err := redisClient.ZCard(ctx, dunningRetryZSetPrefix+"test-tenant").Result()
445445
require.NoError(t, err)
446446
assert.Equal(t, int64(0), count)
447447
}
@@ -474,7 +474,7 @@ func TestStripeWebhookHandler_RefundedDoesNotTriggerDunning(t *testing.T) {
474474
assert.Equal(t, http.StatusOK, rr.Code)
475475

476476
// Verify dunning was NOT scheduled for refunds
477-
count, err := redisClient.ZCard(ctx, dunningRetryZSet).Result()
477+
count, err := redisClient.ZCard(ctx, dunningRetryZSetPrefix+"test-tenant").Result()
478478
require.NoError(t, err)
479479
assert.Equal(t, int64(0), count)
480480
}

services/payment-order/worker/dunning_worker.go

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ import (
1818

1919
// Dunning worker errors.
2020
var (
21-
ErrNilDunningCallback = errors.New("dunning callback is required")
22-
ErrDunningKeyTooShort = errors.New("dunning retry key too short")
21+
ErrNilDunningCallback = errors.New("dunning callback is required")
22+
ErrDunningKeyTooShort = errors.New("dunning retry key too short")
23+
ErrDunningMissingTenant = errors.New("tenant ID is required for dunning retry")
2324
)
2425

25-
// dunningRetryZSet is the Redis sorted set key for dunning retry scheduling.
26-
const dunningRetryZSet = "dunning:retries"
26+
// dunningRetryZSetPrefix is the Redis sorted set key prefix for dunning retry scheduling.
27+
// The full key is "dunning:retries:{tenantID}" for tenant isolation.
28+
const dunningRetryZSetPrefix = "dunning:retries:"
2729

2830
// DunningWorkerConfig holds configuration for the dunning retry worker.
2931
type DunningWorkerConfig struct {
@@ -145,42 +147,75 @@ func (w *DunningWorker) pollLoop(ctx context.Context) error {
145147
}
146148
}
147149

148-
// ScheduleDunningRetry adds a billing run to the sorted set with a score
149-
// equal to the Unix timestamp when the retry becomes due.
150-
func (w *DunningWorker) ScheduleDunningRetry(ctx context.Context, billingRunID uuid.UUID, delay time.Duration) error {
150+
// ScheduleDunningRetry adds a billing run to the tenant-scoped sorted set with
151+
// a score equal to the Unix timestamp when the retry becomes due.
152+
func (w *DunningWorker) ScheduleDunningRetry(ctx context.Context, tenantID string, billingRunID uuid.UUID, delay time.Duration) error {
153+
if tenantID == "" {
154+
return ErrDunningMissingTenant
155+
}
151156
dueAt := NowFunc().Add(delay)
152157
member := redis.Z{
153158
Score: float64(dueAt.Unix()),
154159
Member: billingRunID.String(),
155160
}
156-
err := w.redis.ZAdd(ctx, dunningRetryZSet, member).Err()
161+
key := dunningRetryZSetPrefix + tenantID
162+
err := w.redis.ZAdd(ctx, key, member).Err()
157163
if err != nil {
158164
return fmt.Errorf("failed to schedule dunning retry: %w", err)
159165
}
160166

161167
w.logger.Info("dunning retry scheduled",
162168
"billing_run_id", billingRunID,
169+
"tenant_id", tenantID,
163170
"delay", delay,
164171
"due_at", dueAt)
165172

166173
return nil
167174
}
168175

169-
// processDueRetries queries the sorted set for all members whose score (due
170-
// timestamp) is at or before the current time, processes each one, and removes
171-
// it from the set.
176+
// processDueRetries scans for all tenant-scoped dunning ZSET keys and processes
177+
// due retries from each. Uses SCAN to discover keys matching "dunning:retries:*".
172178
func (w *DunningWorker) processDueRetries(ctx context.Context) {
179+
// Discover all tenant-scoped dunning keys
180+
keys, err := w.scanDunningKeys(ctx)
181+
if err != nil {
182+
w.logger.Error("failed to scan dunning retry keys", "error", err)
183+
return
184+
}
185+
186+
for _, key := range keys {
187+
w.processDueRetriesForKey(ctx, key)
188+
}
189+
}
190+
191+
// scanDunningKeys returns all Redis keys matching the dunning retry ZSET pattern.
192+
func (w *DunningWorker) scanDunningKeys(ctx context.Context) ([]string, error) {
193+
var allKeys []string
194+
pattern := dunningRetryZSetPrefix + "*"
195+
iter := w.redis.Scan(ctx, 0, pattern, 100).Iterator()
196+
for iter.Next(ctx) {
197+
allKeys = append(allKeys, iter.Val())
198+
}
199+
if err := iter.Err(); err != nil {
200+
return nil, fmt.Errorf("failed to scan dunning keys: %w", err)
201+
}
202+
return allKeys, nil
203+
}
204+
205+
// processDueRetriesForKey queries a single tenant's sorted set for all members
206+
// whose score (due timestamp) is at or before the current time, processes each
207+
// one, and removes it from the set.
208+
func (w *DunningWorker) processDueRetriesForKey(ctx context.Context, key string) {
173209
now := NowFunc()
174210
maxScore := strconv.FormatInt(now.Unix(), 10)
175211

176-
// Fetch billing run IDs that are due
177-
members, err := w.redis.ZRangeByScore(ctx, dunningRetryZSet, &redis.ZRangeBy{
212+
members, err := w.redis.ZRangeByScore(ctx, key, &redis.ZRangeBy{
178213
Min: "-inf",
179214
Max: maxScore,
180215
Count: 100,
181216
}).Result()
182217
if err != nil {
183-
w.logger.Error("failed to query dunning retries", "error", err)
218+
w.logger.Error("failed to query dunning retries", "key", key, "error", err)
184219
return
185220
}
186221

@@ -192,20 +227,19 @@ func (w *DunningWorker) processDueRetries(ctx context.Context) {
192227
for _, member := range members {
193228
billingRunID, parseErr := uuid.Parse(member)
194229
if parseErr != nil {
195-
w.logger.Error("invalid billing run ID in dunning set", "member", member, "error", parseErr)
196-
w.redis.ZRem(ctx, dunningRetryZSet, member)
230+
w.logger.Error("invalid billing run ID in dunning set", "key", key, "member", member, "error", parseErr)
231+
w.redis.ZRem(ctx, key, member)
197232
continue
198233
}
199234

200235
if w.processRetry(ctx, billingRunID) {
201-
// Only remove on success; transient failures retain the member for next poll
202-
w.redis.ZRem(ctx, dunningRetryZSet, member)
236+
w.redis.ZRem(ctx, key, member)
203237
processed++
204238
}
205239
}
206240

207241
if processed > 0 {
208-
w.logger.Info("processed dunning retries", "count", processed)
242+
w.logger.Info("processed dunning retries", "key", key, "count", processed)
209243
}
210244
}
211245

@@ -283,8 +317,12 @@ func (w *DunningWorker) executeRetry(ctx context.Context, billingRunID uuid.UUID
283317
return true
284318
}
285319

286-
// CancelDunningRetry removes a billing run from the retry set.
320+
// CancelDunningRetry removes a billing run from the tenant-scoped retry set.
287321
// Called when a billing run is resolved (e.g., manual payment succeeds).
288-
func (w *DunningWorker) CancelDunningRetry(ctx context.Context, billingRunID uuid.UUID) error {
289-
return w.redis.ZRem(ctx, dunningRetryZSet, billingRunID.String()).Err()
322+
func (w *DunningWorker) CancelDunningRetry(ctx context.Context, tenantID string, billingRunID uuid.UUID) error {
323+
if tenantID == "" {
324+
return ErrDunningMissingTenant
325+
}
326+
key := dunningRetryZSetPrefix + tenantID
327+
return w.redis.ZRem(ctx, key, billingRunID.String()).Err()
290328
}

0 commit comments

Comments
 (0)