Skip to content

Commit 79e87f5

Browse files
authored
fix: correct error semantics and add idempotency to financial-gateway webhook handler (#2039)
* fix: correct error semantics and add idempotency to webhook handler - Define ErrMissingWebhookSecret and use it instead of ErrMissingTenantContext when webhook secret is absent; the two errors describe distinct failure modes - Introduce ProcessedEventChecker interface and GormProcessedEventChecker implementation that queries event_outbox by causation_id - Add EventChecker field to WebhookHandlerConfig; when set, duplicate Stripe events are skipped with a 200 rather than re-published; check failures fall through to publish to avoid silent event drops - Set CausationID in PublishConfig so outbox entries carry the Stripe event ID, enabling the DB-backed idempotency check - Add tests: correct error body on empty secret, duplicate skip, check-error fall-through * fix: add tenant scoping and causation_id index to idempotency check - GormProcessedEventChecker.IsProcessed now filters by tenant_id from context in addition to causation_id, consistent with the multi-tenant outbox pattern and preventing cross-tenant scope issues - Add migration 20260329000001 with a partial index on event_outbox(causation_id) WHERE causation_id IS NOT NULL to avoid sequential scans on idempotency lookups as the outbox grows * fix: use WithGormTenantTransaction for outbox idempotency check Replace manual tenant_id WHERE clause with WithGormTenantTransaction to properly set the search_path to the tenant schema, consistent with the project's multi-tenant outbox pattern. * fix: enforce atomic idempotency via unique causation_id index - Make causation_id index UNIQUE to prevent concurrent duplicate inserts - Handle duplicate key errors on the write path as already-processed - Default EventChecker in constructor when DB is available --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 81c7e78 commit 79e87f5

4 files changed

Lines changed: 244 additions & 2 deletions

File tree

services/financial-gateway/adapters/http/webhook_handler.go

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"io"
1414
"log/slog"
1515
"net/http"
16+
"strings"
1617

1718
"github.com/google/uuid"
1819
"google.golang.org/protobuf/proto"
@@ -21,6 +22,7 @@ import (
2122

2223
financialgatewayeventsv1 "github.com/meridianhub/meridian/api/proto/meridian/financial_gateway_events/v1"
2324
stripeadapter "github.com/meridianhub/meridian/services/financial-gateway/adapters/stripe"
25+
db "github.com/meridianhub/meridian/shared/platform/db"
2426
"github.com/meridianhub/meridian/shared/platform/events"
2527
"github.com/meridianhub/meridian/shared/platform/events/topics"
2628
"github.com/meridianhub/meridian/shared/platform/tenant"
@@ -34,8 +36,43 @@ var (
3436
ErrNilClientFactory = errors.New("stripe client factory cannot be nil")
3537
ErrNilOutboxPublisher = errors.New("outbox publisher cannot be nil")
3638
ErrMissingTenantContext = errors.New("missing tenant context for stripe webhook")
39+
ErrMissingWebhookSecret = errors.New("missing webhook secret configured for tenant")
3740
)
3841

42+
// ProcessedEventChecker checks whether a provider webhook event has already been processed.
43+
// Implementations should return (true, nil) if the event was already published to the outbox.
44+
type ProcessedEventChecker interface {
45+
IsProcessed(ctx context.Context, providerEventID string) (bool, error)
46+
}
47+
48+
// GormProcessedEventChecker implements ProcessedEventChecker by querying the event_outbox
49+
// table for an existing entry with a matching causation_id.
50+
type GormProcessedEventChecker struct {
51+
db *gorm.DB
52+
}
53+
54+
// NewGormProcessedEventChecker creates a GormProcessedEventChecker backed by the given DB.
55+
func NewGormProcessedEventChecker(db *gorm.DB) *GormProcessedEventChecker {
56+
return &GormProcessedEventChecker{db: db}
57+
}
58+
59+
// IsProcessed returns true if an outbox entry with causation_id matching providerEventID exists
60+
// for the tenant in ctx. The query runs inside a tenant-scoped transaction (WithGormTenantTransaction)
61+
// so the search_path targets the correct tenant schema, consistent with the project's multi-tenant
62+
// outbox pattern.
63+
func (c *GormProcessedEventChecker) IsProcessed(ctx context.Context, providerEventID string) (bool, error) {
64+
var count int64
65+
err := db.WithGormTenantTransaction(ctx, c.db, func(tx *gorm.DB) error {
66+
return tx.Model(&events.EventOutbox{}).
67+
Where("causation_id = ?", providerEventID).
68+
Count(&count).Error
69+
})
70+
if err != nil {
71+
return false, err
72+
}
73+
return count > 0, nil
74+
}
75+
3976
// OutboxEventPublisher is the interface for publishing domain events to the transactional outbox.
4077
// This abstraction allows for test doubles without requiring a real *gorm.DB.
4178
type OutboxEventPublisher interface {
@@ -49,6 +86,7 @@ type WebhookHandler struct {
4986
publisher OutboxEventPublisher
5087
db *gorm.DB
5188
logger *slog.Logger
89+
eventChecker ProcessedEventChecker
5290
}
5391

5492
// WebhookHandlerConfig contains configuration for creating a WebhookHandler.
@@ -66,6 +104,10 @@ type WebhookHandlerConfig struct {
66104

67105
// Logger is the structured logger. Defaults to slog.Default() if nil.
68106
Logger *slog.Logger
107+
108+
// EventChecker optionally checks whether a provider event has already been processed.
109+
// If nil, idempotency checking is skipped. For production, use NewGormProcessedEventChecker.
110+
EventChecker ProcessedEventChecker
69111
}
70112

71113
// NewWebhookHandler creates a new WebhookHandler.
@@ -81,11 +123,16 @@ func NewWebhookHandler(cfg WebhookHandlerConfig) *WebhookHandler {
81123
if logger == nil {
82124
logger = slog.Default()
83125
}
126+
eventChecker := cfg.EventChecker
127+
if eventChecker == nil && cfg.DB != nil {
128+
eventChecker = NewGormProcessedEventChecker(cfg.DB)
129+
}
84130
return &WebhookHandler{
85131
clientFactory: cfg.ClientFactory,
86132
publisher: cfg.OutboxPublisher,
87133
db: cfg.DB,
88134
logger: logger,
135+
eventChecker: eventChecker,
89136
}
90137
}
91138

@@ -206,7 +253,7 @@ func (h *WebhookHandler) validateAndParseWebhook(
206253
if client.WebhookEndpointSecret == "" {
207254
h.logger.Error("no webhook secret for tenant", "tenant_id", tenantID.String())
208255
h.writeError(w, http.StatusInternalServerError, "no webhook secret configured for tenant")
209-
return stripeadapter.ParsedWebhookEvent{}, ErrMissingTenantContext
256+
return stripeadapter.ParsedWebhookEvent{}, ErrMissingWebhookSecret
210257
}
211258

212259
adapter, err := stripeadapter.NewWebhookAdapter(client.WebhookEndpointSecret)
@@ -252,6 +299,24 @@ func (h *WebhookHandler) publishDomainEvent(
252299
topic string,
253300
tenantID tenant.TenantID,
254301
) {
302+
if h.eventChecker != nil {
303+
already, err := h.eventChecker.IsProcessed(ctx, parsed.EventID)
304+
if err != nil {
305+
h.logger.Error("failed to check event idempotency",
306+
"event_id", parsed.EventID,
307+
"error", err,
308+
)
309+
// Fall through and publish - safer than silently dropping on a check failure.
310+
} else if already {
311+
h.logger.Info("skipping duplicate stripe webhook event",
312+
"event_id", parsed.EventID,
313+
"tenant_id", tenantID.String(),
314+
)
315+
h.writeSuccess(w, "webhook already processed")
316+
return
317+
}
318+
}
319+
255320
h.logger.Info("publishing stripe webhook domain event",
256321
"event_id", parsed.EventID,
257322
"gateway_reference_id", parsed.GatewayReferenceID,
@@ -266,7 +331,16 @@ func (h *WebhookHandler) publishDomainEvent(
266331
AggregateID: parsed.PaymentOrderID,
267332
AggregateType: "PaymentOrder",
268333
PartitionKey: parsed.PaymentOrderID,
334+
CausationID: parsed.EventID,
269335
}); err != nil {
336+
if isDuplicateCausationError(err) {
337+
h.logger.Info("skipping duplicate stripe webhook event (unique constraint)",
338+
"event_id", parsed.EventID,
339+
"tenant_id", tenantID.String(),
340+
)
341+
h.writeSuccess(w, "webhook already processed")
342+
return
343+
}
270344
h.logger.Error("failed to publish domain event to outbox",
271345
"event_id", parsed.EventID,
272346
"topic", topic,
@@ -410,3 +484,15 @@ func (h *WebhookHandler) writeError(w http.ResponseWriter, statusCode int, messa
410484
h.logger.Error("failed to encode error response", "error", err)
411485
}
412486
}
487+
488+
// isDuplicateCausationError returns true if err is a unique constraint violation on the
489+
// causation_id column, indicating the event was already published by a concurrent request.
490+
func isDuplicateCausationError(err error) bool {
491+
if err == nil {
492+
return false
493+
}
494+
s := strings.ToLower(err.Error())
495+
return strings.Contains(s, "duplicate key") ||
496+
strings.Contains(s, "unique constraint") ||
497+
strings.Contains(s, "23505")
498+
}

services/financial-gateway/adapters/http/webhook_handler_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,160 @@ func TestWebhookHandler_MissingPaymentOrderID_AcknowledgesOnly(t *testing.T) {
470470
assert.Len(t, stub.published, 0, "should not publish when payment_order_id is missing")
471471
}
472472

473+
// stubProcessedEventChecker implements ProcessedEventChecker for testing.
474+
type stubProcessedEventChecker struct {
475+
processed map[string]bool
476+
err error
477+
}
478+
479+
func (s *stubProcessedEventChecker) IsProcessed(_ context.Context, providerEventID string) (bool, error) {
480+
if s.err != nil {
481+
return false, s.err
482+
}
483+
return s.processed[providerEventID], nil
484+
}
485+
486+
func TestWebhookHandler_EmptyWebhookSecret_ReturnsCorrectError(t *testing.T) {
487+
stub := &stubOutboxPublisher{}
488+
provider := &testTenantConfigProvider{
489+
configs: map[string]stripeadapter.TenantConfig{
490+
"no-secret-tenant": {
491+
ConnectedAccountID: "acct_test",
492+
WebhookEndpointSecret: "",
493+
},
494+
},
495+
}
496+
factory, err := stripeadapter.NewClientFactory(stripeadapter.Config{
497+
APIKey: "sk_test_key",
498+
TenantCacheSize: 10,
499+
TenantCacheTTL: time.Minute,
500+
CircuitBreakerName: "test-cb",
501+
}, provider, nil)
502+
require.NoError(t, err)
503+
504+
h := fghttp.NewWebhookHandler(fghttp.WebhookHandlerConfig{
505+
ClientFactory: factory,
506+
OutboxPublisher: stub,
507+
})
508+
509+
payload := buildStripePayload(t, "evt_1", "payment_intent.succeeded", map[string]any{})
510+
sig := signPayload(t, payload, testWebhookSecret)
511+
512+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/no-secret-tenant", bytes.NewReader(payload))
513+
req.SetPathValue("tenantID", "no-secret-tenant")
514+
req.Header.Set("Stripe-Signature", sig)
515+
516+
rr := httptest.NewRecorder()
517+
h.HandleStripeWebhook(rr, req)
518+
519+
assert.Equal(t, http.StatusInternalServerError, rr.Code)
520+
var resp map[string]interface{}
521+
require.NoError(t, json.NewDecoder(rr.Body).Decode(&resp))
522+
assert.Equal(t, "no webhook secret configured for tenant", resp["error"])
523+
}
524+
525+
func TestWebhookHandler_DuplicateEvent_SkipsPublish(t *testing.T) {
526+
stub := &stubOutboxPublisher{}
527+
checker := &stubProcessedEventChecker{
528+
processed: map[string]bool{"evt_dup_1": true},
529+
}
530+
531+
provider := &testTenantConfigProvider{
532+
configs: map[string]stripeadapter.TenantConfig{
533+
"test-tenant": {
534+
ConnectedAccountID: "acct_test",
535+
WebhookEndpointSecret: testWebhookSecret,
536+
},
537+
},
538+
}
539+
factory, err := stripeadapter.NewClientFactory(stripeadapter.Config{
540+
APIKey: "sk_test_key",
541+
TenantCacheSize: 10,
542+
TenantCacheTTL: time.Minute,
543+
CircuitBreakerName: "test-cb",
544+
}, provider, nil)
545+
require.NoError(t, err)
546+
547+
h := fghttp.NewWebhookHandler(fghttp.WebhookHandlerConfig{
548+
ClientFactory: factory,
549+
OutboxPublisher: stub,
550+
EventChecker: checker,
551+
})
552+
553+
payload := buildStripePayload(t, "evt_dup_1", "payment_intent.succeeded", map[string]any{
554+
"id": "pi_dup_123",
555+
"object": "payment_intent",
556+
"amount": 5000,
557+
"currency": "gbp",
558+
"metadata": map[string]string{"payment_order_id": "po-dup-123"},
559+
"status": "succeeded",
560+
})
561+
sig := signPayload(t, payload, testWebhookSecret)
562+
563+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
564+
req.SetPathValue("tenantID", "test-tenant")
565+
req.Header.Set("Stripe-Signature", sig)
566+
567+
rr := httptest.NewRecorder()
568+
h.HandleStripeWebhook(rr, req)
569+
570+
assert.Equal(t, http.StatusOK, rr.Code)
571+
assert.Len(t, stub.published, 0, "duplicate event should not be re-published")
572+
var resp map[string]interface{}
573+
require.NoError(t, json.NewDecoder(rr.Body).Decode(&resp))
574+
assert.Equal(t, true, resp["acknowledged"])
575+
}
576+
577+
func TestWebhookHandler_IdempotencyCheckError_FallsThroughToPublish(t *testing.T) {
578+
stub := &stubOutboxPublisher{}
579+
checker := &stubProcessedEventChecker{
580+
err: errors.New("db unavailable"),
581+
}
582+
583+
provider := &testTenantConfigProvider{
584+
configs: map[string]stripeadapter.TenantConfig{
585+
"test-tenant": {
586+
ConnectedAccountID: "acct_test",
587+
WebhookEndpointSecret: testWebhookSecret,
588+
},
589+
},
590+
}
591+
factory, err := stripeadapter.NewClientFactory(stripeadapter.Config{
592+
APIKey: "sk_test_key",
593+
TenantCacheSize: 10,
594+
TenantCacheTTL: time.Minute,
595+
CircuitBreakerName: "test-cb",
596+
}, provider, nil)
597+
require.NoError(t, err)
598+
599+
h := fghttp.NewWebhookHandler(fghttp.WebhookHandlerConfig{
600+
ClientFactory: factory,
601+
OutboxPublisher: stub,
602+
EventChecker: checker,
603+
})
604+
605+
payload := buildStripePayload(t, "evt_fallthrough_1", "payment_intent.succeeded", map[string]any{
606+
"id": "pi_fallthrough_123",
607+
"object": "payment_intent",
608+
"amount": 2000,
609+
"currency": "usd",
610+
"metadata": map[string]string{"payment_order_id": "po-fallthrough-123"},
611+
"status": "succeeded",
612+
})
613+
sig := signPayload(t, payload, testWebhookSecret)
614+
615+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
616+
req.SetPathValue("tenantID", "test-tenant")
617+
req.Header.Set("Stripe-Signature", sig)
618+
619+
rr := httptest.NewRecorder()
620+
h.HandleStripeWebhook(rr, req)
621+
622+
// On idempotency check failure, handler falls through and publishes the event.
623+
assert.Equal(t, http.StatusOK, rr.Code)
624+
assert.Len(t, stub.published, 1, "event should be published when idempotency check fails")
625+
}
626+
473627
func TestWebhookHandler_NewWebhookHandler_PanicsOnNilClientFactory(t *testing.T) {
474628
stub := &stubOutboxPublisher{}
475629
assert.PanicsWithValue(t, fghttp.ErrNilClientFactory.Error(), func() {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
CREATE UNIQUE INDEX IF NOT EXISTS idx_event_outbox_causation_id ON event_outbox (causation_id) WHERE causation_id IS NOT NULL;
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
h1:jrzgSYnGrSymUsy8xdJA5VT7B2HfJSNptud30RK9KcI=
1+
h1:2Tz6trZn1stBcMbe9So9gzm1vzV58P6HoqNpcD8pJxg=
22
20260304000001_create_event_outbox.sql h1:3qowbd0HiVAqH+82GdkX5/FN4Z1XoiGAGxmK6wXGXLA=
33
20260316000001_add_event_outbox_tenant_id.sql h1:nE4czdkI7LnVIV8Z2NS/d0xrK9Wd/XnqzDu17Cu3sf4=
44
20260316000002_add_event_outbox_tenant_id_index.sql h1:dT1XLgLUkdkL5dYpU7SrWnUXob597NGELzuJZHFKW34=
5+
20260329000001_add_event_outbox_causation_id_index.sql h1:hE5bD39isTGGiGrDbrY4K0zKoquml2OqFhgUZbmaWQ4=

0 commit comments

Comments
 (0)