Skip to content

Commit c81fc4a

Browse files
committed
fix: address remaining CodeRabbit review comments
- Validate payment_order_id before publishing to outbox: events missing this metadata field (non-Meridian Stripe payments) are now acknowledged with a warning log instead of returning 500 and causing infinite retries - Add HTTP server timeouts (ReadHeaderTimeout/ReadTimeout/WriteTimeout/ IdleTimeout) to financial-gateway webhook server to prevent slowloris connection exhaustion attacks - Guard against nil PaymentOrderUpdater in both PaymentEventConsumer constructors; NewPaymentEventConsumer panics, NewPaymentEventConsumerWithKafka returns ErrNilPaymentOrderUpdater - Move NewPaymentEventConsumerWithKafka construction before server start goroutines so initialization failures return cleanly without leaving active listeners/goroutines behind
1 parent 25c5c9c commit c81fc4a

4 files changed

Lines changed: 57 additions & 24 deletions

File tree

services/financial-gateway/adapters/http/webhook_handler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,19 @@ func (h *WebhookHandler) HandleStripeWebhook(w http.ResponseWriter, r *http.Requ
201201
return
202202
}
203203

204+
// Require payment_order_id to be present in Stripe metadata.
205+
// Events without it cannot be routed to a payment order — publishing
206+
// would fail with ErrEmptyAggregateID and trigger infinite Stripe retries.
207+
if parsed.PaymentOrderID == "" {
208+
h.logger.Warn("stripe webhook missing payment_order_id in metadata — event acknowledged without processing",
209+
"event_id", parsed.EventID,
210+
"gateway_reference_id", parsed.GatewayReferenceID,
211+
"tenant_id", tenantID.String(),
212+
)
213+
h.writeSuccess(w, "event acknowledged — no payment_order_id in metadata")
214+
return
215+
}
216+
204217
h.logger.Info("publishing stripe webhook domain event",
205218
"event_id", parsed.EventID,
206219
"gateway_reference_id", parsed.GatewayReferenceID,

services/financial-gateway/cmd/main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"net/http"
1414
"os"
1515
"strings"
16+
"time"
1617

1718
controlplanev1 "github.com/meridianhub/meridian/api/proto/meridian/control_plane/v1"
1819
financialgatewayv1 "github.com/meridianhub/meridian/api/proto/meridian/financial_gateway/v1"
@@ -234,8 +235,12 @@ func run(logger *slog.Logger) error {
234235

235236
httpAddress := fmt.Sprintf(":%s", cfg.HTTPPort)
236237
httpServer := &http.Server{
237-
Addr: httpAddress,
238-
Handler: mux,
238+
Addr: httpAddress,
239+
Handler: mux,
240+
ReadHeaderTimeout: 5 * time.Second,
241+
ReadTimeout: 15 * time.Second,
242+
WriteTimeout: 15 * time.Second,
243+
IdleTimeout: 60 * time.Second,
239244
}
240245

241246
// Channel to collect server errors.

services/payment-order/adapters/messaging/payment_event_consumer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import (
1818
// created without Kafka (use NewPaymentEventConsumerWithKafka for production).
1919
var ErrConsumerNotConfigured = errors.New("consumer not configured with Kafka — use NewPaymentEventConsumerWithKafka")
2020

21+
// ErrNilPaymentOrderUpdater is returned when a nil PaymentOrderUpdater is passed to a constructor.
22+
var ErrNilPaymentOrderUpdater = errors.New("payment order updater is required")
23+
2124
// ErrUnexpectedCapturedMessageType is returned when the payment-captured consumer receives a message that is not *PaymentCapturedEvent.
2225
var ErrUnexpectedCapturedMessageType = errors.New("unexpected message type for payment-captured topic")
2326

@@ -50,6 +53,9 @@ type PaymentEventConsumer struct {
5053

5154
// NewPaymentEventConsumer creates a consumer that handles domain events from financial-gateway.
5255
func NewPaymentEventConsumer(svc PaymentOrderUpdater) *PaymentEventConsumer {
56+
if svc == nil {
57+
panic(ErrNilPaymentOrderUpdater.Error())
58+
}
5359
return &PaymentEventConsumer{
5460
svc: svc,
5561
logger: slog.Default(),
@@ -66,6 +72,9 @@ func NewPaymentEventConsumerWithKafka(
6672
svc PaymentOrderUpdater,
6773
logger *slog.Logger,
6874
) (*PaymentEventConsumer, error) {
75+
if svc == nil {
76+
return nil, ErrNilPaymentOrderUpdater
77+
}
6978
if logger == nil {
7079
logger = slog.Default()
7180
}

services/payment-order/cmd/main.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -536,26 +536,10 @@ func run(logger *slog.Logger) error {
536536
return fmt.Errorf("failed to listen on %s: %w", grpcAddress, err)
537537
}
538538

539-
// Channel to collect server errors (gRPC + HTTP + payment event consumer).
540-
serverErrors := make(chan error, 3)
541-
542-
// Start gRPC server in background
543-
go func() {
544-
logger.Info("starting gRPC server", "address", grpcAddress)
545-
if err := grpcServer.Serve(grpcListener); err != nil {
546-
serverErrors <- fmt.Errorf("gRPC server error: %w", err)
547-
}
548-
}()
549-
550-
// Start HTTP server in background
551-
go func() {
552-
logger.Info("starting HTTP server", "port", httpPort)
553-
if err := httpServer.Start(); err != nil {
554-
serverErrors <- fmt.Errorf("HTTP server error: %w", err)
555-
}
556-
}()
557-
558-
// Start financial-gateway payment event consumer (if Kafka is configured).
539+
// Construct the financial-gateway payment event consumer before starting servers
540+
// so that any initialization error causes an early return without leaving
541+
// running goroutines/listeners behind.
542+
//
559543
// Subscribes to financial-gateway.payment-captured.v1 and
560544
// financial-gateway.payment-failed.v1 topics and calls UpdatePaymentOrder
561545
// to transition payment orders to SETTLED or REJECTED.
@@ -580,7 +564,31 @@ func run(logger *slog.Logger) error {
580564
logger.Error("failed to close payment event consumer", "error", err)
581565
}
582566
}()
567+
} else {
568+
logger.Warn("payment event consumer disabled - KAFKA_BOOTSTRAP_SERVERS not set")
569+
}
583570

571+
// Channel to collect server errors (gRPC + HTTP + payment event consumer).
572+
serverErrors := make(chan error, 3)
573+
574+
// Start gRPC server in background
575+
go func() {
576+
logger.Info("starting gRPC server", "address", grpcAddress)
577+
if err := grpcServer.Serve(grpcListener); err != nil {
578+
serverErrors <- fmt.Errorf("gRPC server error: %w", err)
579+
}
580+
}()
581+
582+
// Start HTTP server in background
583+
go func() {
584+
logger.Info("starting HTTP server", "port", httpPort)
585+
if err := httpServer.Start(); err != nil {
586+
serverErrors <- fmt.Errorf("HTTP server error: %w", err)
587+
}
588+
}()
589+
590+
// Start payment event consumer after servers are up.
591+
if paymentEventConsumer != nil {
584592
go func() {
585593
if err := paymentEventConsumer.Start(
586594
"financial-gateway.payment-captured.v1",
@@ -590,8 +598,6 @@ func run(logger *slog.Logger) error {
590598
serverErrors <- fmt.Errorf("payment event consumer error: %w", err)
591599
}
592600
}()
593-
} else {
594-
logger.Warn("payment event consumer disabled - KAFKA_BOOTSTRAP_SERVERS not set")
595601
}
596602

597603
// Start billing workers in background (if enabled)

0 commit comments

Comments
 (0)