Skip to content

Commit 25c5c9c

Browse files
committed
fix: address CodeRabbit review comments for webhook migration
- Fix silent proto deserialization corruption: use two separate kafka.ProtoConsumer instances (one per topic) with the correct typed msgFactory, preventing PaymentFailedEvent bytes from being decoded as PaymentCapturedEvent - Fix missing tenant context: change route to POST /webhooks/stripe/{tenantID} and extract tenant via r.PathValue("tenantID") instead of relying on middleware that was never wired; inject into ctx via tenant.WithTenant - Fix nil OutboxPublisher not validated: panic in NewWebhookHandler when OutboxPublisher is nil, consistent with ClientFactory guard - Fix gRPC listener resource leak: add listenerClosed flag with deferred close so listener is not abandoned on startup failures after bind - Fix serverErrors channel buffer too small: increase from 2 to 3 to accommodate gRPC + HTTP + payment event consumer goroutines - Use static sentinel errors (ErrUnexpectedCapturedMessageType, ErrUnexpectedFailedMessageType) for type assertion failures in consumers to satisfy err113 linter - Update webhook_handler_test.go to use r.SetPathValue("tenantID") instead of tenant.WithTenant context injection, matching new handler behaviour; remove unused tenant import - Update payment-order/cmd/main.go to use new Start(capturedTopic, failedTopic string) signature for PaymentEventConsumer
1 parent 3c7a5c2 commit 25c5c9c

5 files changed

Lines changed: 154 additions & 80 deletions

File tree

services/financial-gateway/adapters/http/webhook_handler.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ type WebhookHandlerConfig struct {
6969
}
7070

7171
// NewWebhookHandler creates a new WebhookHandler.
72-
// Panics if ClientFactory is nil to fail fast during initialization.
72+
// Panics if ClientFactory or OutboxPublisher is nil to fail fast during initialization.
7373
func NewWebhookHandler(cfg WebhookHandlerConfig) *WebhookHandler {
7474
if cfg.ClientFactory == nil {
7575
panic(ErrNilClientFactory.Error())
7676
}
77+
if cfg.OutboxPublisher == nil {
78+
panic(ErrNilOutboxPublisher.Error())
79+
}
7780
logger := cfg.Logger
7881
if logger == nil {
7982
logger = slog.Default()
@@ -95,9 +98,13 @@ type webhookResponse struct {
9598

9699
// HandleStripeWebhook processes an incoming Stripe webhook.
97100
//
101+
// The route must be registered as POST /webhooks/stripe/{tenantID} so that
102+
// r.PathValue("tenantID") returns the tenant identifier. Stripe is configured
103+
// to call the tenant-specific URL (e.g. /webhooks/stripe/acme-corp).
104+
//
98105
// Flow:
99106
// 1. Validate HTTP method and read body
100-
// 2. Extract tenant context
107+
// 2. Extract tenant ID from URL path and inject into context
101108
// 3. Resolve per-tenant Stripe client (contains webhook secret)
102109
// 4. Validate Stripe-Signature using the tenant-specific secret
103110
// 5. Map the Stripe event to a domain event (PaymentCapturedEvent or PaymentFailedEvent)
@@ -124,12 +131,16 @@ func (h *WebhookHandler) HandleStripeWebhook(w http.ResponseWriter, r *http.Requ
124131
return
125132
}
126133

127-
tenantID, ok := tenant.FromContext(ctx)
128-
if !ok {
129-
h.logger.Warn("missing tenant context for stripe webhook")
134+
// Extract tenant ID from the URL path segment {tenantID}.
135+
// The route must be registered as: POST /webhooks/stripe/{tenantID}
136+
rawTenantID := r.PathValue("tenantID")
137+
if rawTenantID == "" {
138+
h.logger.Warn("missing tenant ID in stripe webhook URL path")
130139
h.writeError(w, http.StatusBadRequest, ErrMissingTenantContext.Error())
131140
return
132141
}
142+
tenantID := tenant.TenantID(rawTenantID)
143+
ctx = tenant.WithTenant(ctx, tenantID)
133144

134145
client, err := h.clientFactory.NewClient(ctx)
135146
if err != nil {

services/financial-gateway/adapters/http/webhook_handler_test.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
fghttp "github.com/meridianhub/meridian/services/financial-gateway/adapters/http"
2121
stripeadapter "github.com/meridianhub/meridian/services/financial-gateway/adapters/stripe"
2222
"github.com/meridianhub/meridian/shared/platform/events"
23-
"github.com/meridianhub/meridian/shared/platform/tenant"
2423
)
2524

2625
const testWebhookSecret = "whsec_test_webhook_secret_for_financial_gateway"
@@ -132,7 +131,8 @@ func TestWebhookHandler_MissingTenantContext(t *testing.T) {
132131
payload := buildStripePayload(t, "evt_1", "payment_intent.succeeded", map[string]any{})
133132
sig := signPayload(t, payload, testWebhookSecret)
134133

135-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
134+
// No {tenantID} path value set — simulates request without tenant in URL.
135+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/", bytes.NewReader(payload))
136136
req.Header.Set("Stripe-Signature", sig)
137137

138138
rr := httptest.NewRecorder()
@@ -145,9 +145,8 @@ func TestWebhookHandler_MissingSignature(t *testing.T) {
145145
h := setupHandler(t, nil)
146146

147147
payload := buildStripePayload(t, "evt_2", "payment_intent.succeeded", map[string]any{})
148-
ctx := tenant.WithTenant(context.Background(), "test-tenant")
149-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
150-
req = req.WithContext(ctx)
148+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
149+
req.SetPathValue("tenantID", "test-tenant")
151150

152151
rr := httptest.NewRecorder()
153152
h.HandleStripeWebhook(rr, req)
@@ -161,9 +160,8 @@ func TestWebhookHandler_InvalidSignature(t *testing.T) {
161160
payload := buildStripePayload(t, "evt_3", "payment_intent.succeeded", map[string]any{})
162161
sig := signPayload(t, payload, "whsec_wrong_secret")
163162

164-
ctx := tenant.WithTenant(context.Background(), "test-tenant")
165-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
166-
req = req.WithContext(ctx)
163+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
164+
req.SetPathValue("tenantID", "test-tenant")
167165
req.Header.Set("Stripe-Signature", sig)
168166

169167
rr := httptest.NewRecorder()
@@ -180,9 +178,8 @@ func TestWebhookHandler_UnsupportedEvent_Returns200(t *testing.T) {
180178
})
181179
sig := signPayload(t, payload, testWebhookSecret)
182180

183-
ctx := tenant.WithTenant(context.Background(), "test-tenant")
184-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
185-
req = req.WithContext(ctx)
181+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
182+
req.SetPathValue("tenantID", "test-tenant")
186183
req.Header.Set("Stripe-Signature", sig)
187184

188185
rr := httptest.NewRecorder()
@@ -208,9 +205,8 @@ func TestWebhookHandler_PaymentCaptured_PublishesToOutbox(t *testing.T) {
208205
})
209206
sig := signPayload(t, payload, testWebhookSecret)
210207

211-
ctx := tenant.WithTenant(context.Background(), "test-tenant")
212-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
213-
req = req.WithContext(ctx)
208+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
209+
req.SetPathValue("tenantID", "test-tenant")
214210
req.Header.Set("Stripe-Signature", sig)
215211

216212
rr := httptest.NewRecorder()
@@ -247,9 +243,8 @@ func TestWebhookHandler_PaymentFailed_PublishesToOutbox(t *testing.T) {
247243
})
248244
sig := signPayload(t, payload, testWebhookSecret)
249245

250-
ctx := tenant.WithTenant(context.Background(), "test-tenant")
251-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
252-
req = req.WithContext(ctx)
246+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
247+
req.SetPathValue("tenantID", "test-tenant")
253248
req.Header.Set("Stripe-Signature", sig)
254249

255250
rr := httptest.NewRecorder()
@@ -281,9 +276,8 @@ func TestWebhookHandler_OutboxPublishFails_Returns500(t *testing.T) {
281276
})
282277
sig := signPayload(t, payload, testWebhookSecret)
283278

284-
ctx := tenant.WithTenant(context.Background(), "test-tenant")
285-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
286-
req = req.WithContext(ctx)
279+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/test-tenant", bytes.NewReader(payload))
280+
req.SetPathValue("tenantID", "test-tenant")
287281
req.Header.Set("Stripe-Signature", sig)
288282

289283
rr := httptest.NewRecorder()
@@ -313,9 +307,8 @@ func TestWebhookHandler_TenantNotFound_Returns500(t *testing.T) {
313307
payload := buildStripePayload(t, "evt_notfound_1", "payment_intent.succeeded", map[string]any{})
314308
sig := signPayload(t, payload, testWebhookSecret)
315309

316-
ctx := tenant.WithTenant(context.Background(), "unknown-tenant")
317-
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe", bytes.NewReader(payload))
318-
req = req.WithContext(ctx)
310+
req := httptest.NewRequest(http.MethodPost, "/webhooks/stripe/unknown-tenant", bytes.NewReader(payload))
311+
req.SetPathValue("tenantID", "unknown-tenant")
319312
req.Header.Set("Stripe-Signature", sig)
320313

321314
rr := httptest.NewRecorder()

services/financial-gateway/cmd/main.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ func run(logger *slog.Logger) error {
144144
if err != nil {
145145
return fmt.Errorf("failed to listen on %s: %w", grpcAddress, err)
146146
}
147+
// Close the listener if any subsequent setup fails (before grpcServer.Serve takes ownership).
148+
listenerClosed := false
149+
defer func() {
150+
if !listenerClosed {
151+
_ = listener.Close()
152+
}
153+
}()
147154

148155
// --- Stripe webhook receiver setup ---
149156

@@ -220,7 +227,10 @@ func run(logger *slog.Logger) error {
220227

221228
// Create HTTP mux and register the Stripe webhook endpoint.
222229
mux := http.NewServeMux()
223-
mux.HandleFunc("/webhooks/stripe", webhookHandler.HandleStripeWebhook)
230+
// Tenant ID is embedded in the URL path so Stripe can be configured to call
231+
// per-tenant endpoints (e.g. /webhooks/stripe/acme-corp). The handler
232+
// extracts the tenant from r.PathValue("tenantID") and injects it into ctx.
233+
mux.HandleFunc("POST /webhooks/stripe/{tenantID}", webhookHandler.HandleStripeWebhook)
224234

225235
httpAddress := fmt.Sprintf(":%s", cfg.HTTPPort)
226236
httpServer := &http.Server{
@@ -232,6 +242,9 @@ func run(logger *slog.Logger) error {
232242
serverErrors := make(chan error, 2)
233243

234244
// Start gRPC server in background.
245+
// grpcServer.Serve takes ownership of the listener; mark it so the deferred
246+
// close does not double-close on normal shutdown.
247+
listenerClosed = true
235248
go func() {
236249
logger.Info("starting gRPC server", "address", grpcAddress)
237250
if err := grpcServer.Serve(listener); err != nil {

services/payment-order/adapters/messaging/payment_event_consumer.go

Lines changed: 104 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ import (
1818
// created without Kafka (use NewPaymentEventConsumerWithKafka for production).
1919
var ErrConsumerNotConfigured = errors.New("consumer not configured with Kafka — use NewPaymentEventConsumerWithKafka")
2020

21+
// ErrUnexpectedCapturedMessageType is returned when the payment-captured consumer receives a message that is not *PaymentCapturedEvent.
22+
var ErrUnexpectedCapturedMessageType = errors.New("unexpected message type for payment-captured topic")
23+
24+
// ErrUnexpectedFailedMessageType is returned when the payment-failed consumer receives a message that is not *PaymentFailedEvent.
25+
var ErrUnexpectedFailedMessageType = errors.New("unexpected message type for payment-failed topic")
26+
2127
// PaymentOrderUpdater is the interface for updating payment order status.
2228
// Implemented by the payment-order gRPC service.
2329
type PaymentOrderUpdater interface {
@@ -30,10 +36,16 @@ type PaymentOrderUpdater interface {
3036
// This consumer subscribes to:
3137
// - financial-gateway.payment-captured.v1 → marks payment order as SETTLED
3238
// - financial-gateway.payment-failed.v1 → marks payment order as REJECTED
39+
//
40+
// Two separate ProtoConsumers are used so that each topic uses the correct
41+
// proto message factory for deserialization. A single consumer with a shared
42+
// msgFactory would cause PaymentFailedEvent bytes to be deserialized as
43+
// PaymentCapturedEvent, silently corrupting the event.
3344
type PaymentEventConsumer struct {
34-
updater *kafka.ProtoConsumer
35-
svc PaymentOrderUpdater
36-
logger *slog.Logger
45+
capturedConsumer *kafka.ProtoConsumer
46+
failedConsumer *kafka.ProtoConsumer
47+
svc PaymentOrderUpdater
48+
logger *slog.Logger
3749
}
3850

3951
// NewPaymentEventConsumer creates a consumer that handles domain events from financial-gateway.
@@ -45,8 +57,8 @@ func NewPaymentEventConsumer(svc PaymentOrderUpdater) *PaymentEventConsumer {
4557
}
4658

4759
// NewPaymentEventConsumerWithKafka creates a PaymentEventConsumer wired to real Kafka topics.
48-
// The returned consumer handles both payment-captured and payment-failed events by routing
49-
// on the event type within a single consumer group.
60+
// Two separate consumers are created — one per topic — so each uses the correct proto
61+
// message factory for deserialization.
5062
//
5163
// Call Start() to begin consuming and Stop()/Close() for graceful shutdown.
5264
func NewPaymentEventConsumerWithKafka(
@@ -63,66 +75,113 @@ func NewPaymentEventConsumerWithKafka(
6375
logger: logger,
6476
}
6577

66-
// The message factory returns PaymentCapturedEvent by default;
67-
// we dispatch based on the proto type in the handler.
68-
// Since the two event types share the same consumer group and topics,
69-
// we use a wrapper that attempts to unmarshal as each type.
70-
msgFactory := func() proto.Message {
71-
// Return a placeholder; actual type detection happens in the handler
72-
// by attempting unmarshal of each event type.
73-
return &financialgatewayeventsv1.PaymentCapturedEvent{}
74-
}
75-
76-
handler := func(ctx context.Context, key []byte, msg proto.Message) error {
77-
return c.dispatch(ctx, key, msg)
78+
// Consumer for payment-captured events uses PaymentCapturedEvent as the proto type.
79+
capturedConfig := kafkaConfig
80+
capturedConfig.GroupID = kafkaConfig.GroupID + "-captured"
81+
capturedConfig.ClientID = kafkaConfig.ClientID + "-captured"
82+
capturedConsumer, err := kafka.NewProtoConsumer(
83+
capturedConfig,
84+
func() proto.Message { return &financialgatewayeventsv1.PaymentCapturedEvent{} },
85+
func(ctx context.Context, key []byte, msg proto.Message) error {
86+
evt, ok := msg.(*financialgatewayeventsv1.PaymentCapturedEvent)
87+
if !ok {
88+
return fmt.Errorf("%w: %T", ErrUnexpectedCapturedMessageType, msg)
89+
}
90+
return c.HandlePaymentCapturedEvent(ctx, key, evt)
91+
},
92+
)
93+
if err != nil {
94+
return nil, fmt.Errorf("failed to create payment-captured kafka consumer: %w", err)
7895
}
7996

80-
consumer, err := kafka.NewProtoConsumer(kafkaConfig, msgFactory, handler)
97+
// Consumer for payment-failed events uses PaymentFailedEvent as the proto type.
98+
failedConfig := kafkaConfig
99+
failedConfig.GroupID = kafkaConfig.GroupID + "-failed"
100+
failedConfig.ClientID = kafkaConfig.ClientID + "-failed"
101+
failedConsumer, err := kafka.NewProtoConsumer(
102+
failedConfig,
103+
func() proto.Message { return &financialgatewayeventsv1.PaymentFailedEvent{} },
104+
func(ctx context.Context, key []byte, msg proto.Message) error {
105+
evt, ok := msg.(*financialgatewayeventsv1.PaymentFailedEvent)
106+
if !ok {
107+
return fmt.Errorf("%w: %T", ErrUnexpectedFailedMessageType, msg)
108+
}
109+
return c.HandlePaymentFailedEvent(ctx, key, evt)
110+
},
111+
)
81112
if err != nil {
82-
return nil, fmt.Errorf("failed to create payment event kafka consumer: %w", err)
113+
_ = capturedConsumer.Close()
114+
return nil, fmt.Errorf("failed to create payment-failed kafka consumer: %w", err)
83115
}
84116

85-
c.updater = consumer
117+
c.capturedConsumer = capturedConsumer
118+
c.failedConsumer = failedConsumer
86119
return c, nil
87120
}
88121

89122
// Start subscribes to the financial-gateway payment topics and begins consuming.
90-
// Blocks until Stop() is called or an unrecoverable error occurs.
91-
func (c *PaymentEventConsumer) Start(topicList []string) error {
92-
if c.updater == nil {
123+
// Starts both consumers in goroutines and blocks until both have stopped.
124+
// Returns the first error encountered, or nil if both stop cleanly.
125+
func (c *PaymentEventConsumer) Start(capturedTopic, failedTopic string) error {
126+
if c.capturedConsumer == nil || c.failedConsumer == nil {
93127
return ErrConsumerNotConfigured
94128
}
95-
c.logger.Info("starting payment event consumer", "topics", topicList)
96-
return c.updater.Subscribe(topicList)
129+
c.logger.Info("starting payment event consumers",
130+
"captured_topic", capturedTopic,
131+
"failed_topic", failedTopic,
132+
)
133+
134+
errs := make(chan error, 2)
135+
136+
go func() {
137+
if err := c.capturedConsumer.Subscribe([]string{capturedTopic}); err != nil {
138+
errs <- fmt.Errorf("payment-captured consumer: %w", err)
139+
} else {
140+
errs <- nil
141+
}
142+
}()
143+
144+
go func() {
145+
if err := c.failedConsumer.Subscribe([]string{failedTopic}); err != nil {
146+
errs <- fmt.Errorf("payment-failed consumer: %w", err)
147+
} else {
148+
errs <- nil
149+
}
150+
}()
151+
152+
// Wait for both consumers to stop.
153+
var firstErr error
154+
for range 2 {
155+
if err := <-errs; err != nil && firstErr == nil {
156+
firstErr = err
157+
}
158+
}
159+
return firstErr
97160
}
98161

99-
// Stop gracefully stops the Kafka consumer.
162+
// Stop gracefully stops both Kafka consumers.
100163
func (c *PaymentEventConsumer) Stop() {
101-
if c.updater != nil {
102-
c.updater.Stop()
164+
if c.capturedConsumer != nil {
165+
c.capturedConsumer.Stop()
166+
}
167+
if c.failedConsumer != nil {
168+
c.failedConsumer.Stop()
103169
}
104170
}
105171

106-
// Close closes the consumer and releases resources.
172+
// Close closes both consumers and releases resources.
107173
func (c *PaymentEventConsumer) Close() error {
108-
if c.updater != nil {
109-
return c.updater.Close()
174+
var capturedErr, failedErr error
175+
if c.capturedConsumer != nil {
176+
capturedErr = c.capturedConsumer.Close()
110177
}
111-
return nil
112-
}
113-
114-
// dispatch routes an incoming Kafka message to the correct handler based on its proto type.
115-
// Since both captured and failed events flow through the same consumer, we attempt
116-
// to identify which event type was received.
117-
func (c *PaymentEventConsumer) dispatch(ctx context.Context, key []byte, msg proto.Message) error {
118-
switch evt := msg.(type) {
119-
case *financialgatewayeventsv1.PaymentCapturedEvent:
120-
return c.HandlePaymentCapturedEvent(ctx, key, evt)
121-
default:
122-
c.logger.Warn("received unexpected message type in payment event consumer",
123-
"type", fmt.Sprintf("%T", msg))
124-
return nil
178+
if c.failedConsumer != nil {
179+
failedErr = c.failedConsumer.Close()
180+
}
181+
if capturedErr != nil {
182+
return capturedErr
125183
}
184+
return failedErr
126185
}
127186

128187
// HandlePaymentCapturedEvent processes a PaymentCapturedEvent by marking the

0 commit comments

Comments
 (0)