Skip to content

Commit 3f9b34a

Browse files
authored
Merge pull request #995 from meridianhub/meridian-unified--12--idempotency-consistency-refactor
refactor: standardize idempotency wiring across all services (meridian-unified.12)
2 parents 13c9a3b + 26a6e1f commit 3f9b34a

8 files changed

Lines changed: 324 additions & 105 deletions

File tree

services/current-account/cmd/main.go

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33

44
import (
55
"context"
6+
"errors"
67
"fmt"
78
"log/slog"
89
"net"
@@ -32,7 +33,6 @@ import (
3233
"github.com/meridianhub/meridian/shared/platform/kafka"
3334
"github.com/meridianhub/meridian/shared/platform/observability"
3435
"github.com/meridianhub/meridian/shared/platform/ports"
35-
"github.com/redis/go-redis/v9"
3636
"github.com/sony/gobreaker/v2"
3737
"google.golang.org/grpc/health/grpc_health_v1"
3838
"google.golang.org/grpc/reflection"
@@ -46,6 +46,11 @@ var (
4646
BuildDate = "unknown"
4747
)
4848

49+
// Static errors for production environment requirements
50+
var (
51+
ErrRedisRequiredInProduction = errors.New("redis required for idempotency in production environment")
52+
)
53+
4954
func main() {
5055
// Initialize structured logging with configurable log level
5156
// Note: bootstrap.NewLogger hardcodes INFO level, so we create logger manually for LOG_LEVEL support
@@ -105,27 +110,34 @@ func run(logger *slog.Logger) error {
105110
withdrawalRepo := persistence.NewWithdrawalRepository(db)
106111
outboxRepo := events.NewPostgresOutboxRepository(db)
107112

108-
// Create Redis client lazily (optional - graceful degradation during startup).
109-
// LazyClient resolves Redis in a background goroutine with exponential backoff.
110-
// The idempotency service degrades gracefully (no-op) until Redis connects.
113+
// Create Redis client and idempotency service.
114+
// In production: fail fast if Redis is unavailable (idempotency is critical).
115+
// In non-production: use NoopService for graceful degradation with metrics.
116+
var idempotencyService idempotency.Service
111117
redisConfig := bootstrap.DefaultRedisConfig()
112118
redisConfig.Logger = logger
113-
lazyRedis := bootstrap.NewLazyClient(ctx, "redis",
114-
func(ctx context.Context) (*redis.Client, func(), error) {
115-
client, err := bootstrap.NewRedisClient(ctx, redisConfig)
116-
if err != nil {
117-
return nil, nil, err
119+
redisClient, redisErr := bootstrap.NewRedisClient(ctx, redisConfig)
120+
if redisErr != nil {
121+
if env.IsProduction() {
122+
logger.Error("CRITICAL: Redis unavailable in production - failing fast", "error", redisErr)
123+
return bootstrap.Permanent(fmt.Errorf("%w: %w", ErrRedisRequiredInProduction, redisErr))
124+
}
125+
logger.Warn("Redis not available at startup, using noop idempotency service - DEVELOPMENT ONLY",
126+
"error", redisErr,
127+
"environment", os.Getenv("ENVIRONMENT"))
128+
idempotencyService = idempotency.NewNoopService(logger)
129+
caobservability.SetNoopIdempotencyActive(true)
130+
caobservability.RecordServiceDegradation(caobservability.ComponentIdempotency, caobservability.DegradationReasonStartupFallback)
131+
} else {
132+
idempotencyService = idempotency.NewRedisService(redisClient)
133+
caobservability.SetNoopIdempotencyActive(false)
134+
logger.Info("idempotency service initialized with Redis")
135+
defer func() {
136+
if err := redisClient.Close(); err != nil {
137+
logger.Error("failed to close Redis client", "error", err)
118138
}
119-
return client, func() {
120-
if err := client.Close(); err != nil {
121-
logger.Error("failed to close Redis client", "error", err)
122-
}
123-
logger.Info("Redis client closed")
124-
}, nil
125-
},
126-
bootstrap.WithLazyLogger(logger),
127-
)
128-
idempotencyService := idempotency.NewLazyService(lazyRedis)
139+
}()
140+
}
129141

130142
// Get Kubernetes namespace from environment (defaults to "default")
131143
namespace := env.GetEnvOrDefault("K8S_NAMESPACE", "default")
@@ -309,17 +321,7 @@ func run(logger *slog.Logger) error {
309321
})
310322
}
311323

312-
// Close lazily-resolved Redis client if it was resolved
313-
orchestrator.AddCleanup(func() error {
314-
if client, err := lazyRedis.Get(); err == nil {
315-
if closeErr := client.Close(); closeErr != nil {
316-
logger.Error("failed to close Redis client", "error", closeErr)
317-
return closeErr
318-
}
319-
logger.Info("Redis client closed")
320-
}
321-
return nil
322-
})
324+
// No additional Redis cleanup needed here — Redis client is closed via defer above when available
323325
orchestrator.AddCleanup(func() error {
324326
bootstrap.CloseDatabase(db, logger)
325327
return nil

services/current-account/observability/metrics.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,22 @@ var (
147147
[]string{"clearing_type"},
148148
)
149149

150+
// NoOp fallback metrics - indicates degraded service functionality
151+
noopIdempotencyActive = promauto.NewGauge(
152+
prometheus.GaugeOpts{
153+
Name: "current_account_noop_idempotency_active",
154+
Help: "1 if NoOp idempotency service is active (production risk), 0 otherwise",
155+
},
156+
)
157+
158+
serviceDegradationEvents = promauto.NewCounterVec(
159+
prometheus.CounterOpts{
160+
Name: "current_account_service_degradation_events_total",
161+
Help: "Total number of service degradation events by component",
162+
},
163+
[]string{"component", "reason"},
164+
)
165+
150166
// Webhook delivery metrics - tracks delivery attempts and failures for regulatory notifications
151167
webhookDeliveryAttempts = promauto.NewCounterVec(
152168
prometheus.CounterOpts{
@@ -319,6 +335,38 @@ func RecordWebhookDeliveryRetry(eventType string) {
319335
webhookDeliveryRetries.WithLabelValues(eventType).Inc()
320336
}
321337

338+
// Service component constants for degradation metrics.
339+
const (
340+
ComponentIdempotency = "idempotency"
341+
)
342+
343+
// Degradation reason constants.
344+
const (
345+
DegradationReasonStartupFallback = "startup_fallback"
346+
)
347+
348+
// SetNoopIdempotencyActive sets the gauge indicating whether NoOp idempotency is active.
349+
// This metric MUST trigger a critical alert in production environments.
350+
//
351+
// ALERTING: This metric MUST have a Prometheus alert configured:
352+
//
353+
// alert: NoopIdempotencyActiveInProduction
354+
// expr: current_account_noop_idempotency_active == 1 AND environment == "production"
355+
// severity: critical
356+
// runbook: docs/runbooks/noop-fallback-active.md
357+
func SetNoopIdempotencyActive(active bool) {
358+
if active {
359+
noopIdempotencyActive.Set(1)
360+
} else {
361+
noopIdempotencyActive.Set(0)
362+
}
363+
}
364+
365+
// RecordServiceDegradation records a service degradation event.
366+
func RecordServiceDegradation(component, reason string) {
367+
serviceDegradationEvents.WithLabelValues(component, reason).Inc()
368+
}
369+
322370
// RecordWebhookDeliveryExhausted records when webhook delivery retries are exhausted.
323371
// This indicates a regulatory compliance risk - freeze/close notifications not delivered.
324372
//

services/financial-accounting/cmd/main.go

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,9 @@ func run(logger *slog.Logger) error {
251251

252252
// Create Redis client and idempotency service.
253253
// In production: fail fast if Redis is unavailable (idempotency is critical).
254-
// In non-production: use LazyClient for graceful degradation during startup.
254+
// In non-production: use NoopService for graceful degradation with metrics.
255255
var idempotencySvc idempotency.Service
256256
var redisSvc *idempotency.RedisService // Keep reference for cleanup worker
257-
var lazyRedis *bootstrap.LazyClient[*redis.Client]
258257
var usingNoopIdempotency bool
259258
redisClient, err := createRedisClient(logger)
260259
if err != nil {
@@ -263,27 +262,11 @@ func run(logger *slog.Logger) error {
263262
"error", err)
264263
return bootstrap.Permanent(fmt.Errorf("%w: %w", ErrRedisRequiredInProduction, err))
265264
}
266-
// Non-production: use lazy Redis resolution instead of noop
267-
logger.Warn("Redis not available at startup, using lazy resolution - DEVELOPMENT ONLY",
265+
logger.Warn("Redis not available at startup, using noop idempotency service - DEVELOPMENT ONLY",
268266
"error", err,
269267
"environment", os.Getenv("ENVIRONMENT"))
270-
lazyRedis = bootstrap.NewLazyClient(ctx, "redis",
271-
func(_ context.Context) (*redis.Client, func(), error) {
272-
//nolint:contextcheck // createRedisClient manages its own timeout context
273-
client, redisErr := createRedisClient(logger)
274-
if redisErr != nil {
275-
return nil, nil, redisErr
276-
}
277-
return client, func() {
278-
if closeErr := client.Close(); closeErr != nil {
279-
logger.Error("failed to close Redis client", "error", closeErr)
280-
}
281-
}, nil
282-
},
283-
bootstrap.WithLazyLogger(logger),
284-
)
285-
idempotencySvc = idempotency.NewLazyService(lazyRedis)
286-
usingNoopIdempotency = true // Tracks that we're in degraded mode initially
268+
idempotencySvc = idempotency.NewNoopService(logger)
269+
usingNoopIdempotency = true
287270
serviceobs.SetNoopIdempotencyActive(true)
288271
serviceobs.RecordServiceDegradation(serviceobs.ComponentIdempotency, serviceobs.DegradationReasonStartupFallback)
289272
} else {
@@ -536,17 +519,6 @@ func run(logger *slog.Logger) error {
536519
logger.Info("idempotency cleanup worker stopped")
537520
}
538521

539-
// Close lazily-resolved Redis client if it was resolved
540-
if lazyRedis != nil {
541-
if client, getErr := lazyRedis.Get(); getErr == nil {
542-
if closeErr := client.Close(); closeErr != nil {
543-
logger.Error("failed to close lazy Redis client", "error", closeErr)
544-
} else {
545-
logger.Info("lazy Redis client closed")
546-
}
547-
}
548-
}
549-
550522
// Stop outbox worker first (stop processing before closing Kafka producer)
551523
if outboxWorker != nil {
552524
logger.Info("stopping outbox worker...")

services/payment-order/cmd/main.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/meridianhub/meridian/services/payment-order/adapters/persistence"
2525
"github.com/meridianhub/meridian/services/payment-order/config"
2626
"github.com/meridianhub/meridian/services/payment-order/domain"
27+
poobservability "github.com/meridianhub/meridian/services/payment-order/observability"
2728
"github.com/meridianhub/meridian/services/payment-order/service"
2829
"github.com/meridianhub/meridian/services/payment-order/worker"
2930
sharedclients "github.com/meridianhub/meridian/shared/pkg/clients"
@@ -66,6 +67,9 @@ var (
6667
// ErrMissingHMACSecret is returned when the WEBHOOK_HMAC_SECRET environment variable is not set.
6768
var ErrMissingHMACSecret = errors.New("WEBHOOK_HMAC_SECRET environment variable is required")
6869

70+
// ErrRedisRequiredInProduction is returned when Redis is unavailable in production environments.
71+
var ErrRedisRequiredInProduction = errors.New("redis required for idempotency in production environment")
72+
6973
func main() {
7074
// Initialize structured logging with configurable log level
7175
// Note: bootstrap.NewLogger hardcodes INFO level, so we create logger manually for LOG_LEVEL support
@@ -178,17 +182,32 @@ func run(logger *slog.Logger) error {
178182
}
179183
defer kafkaProducer.Close()
180184

181-
// Create Redis client and idempotency service
182-
redisClient, err := createRedisClient(logger)
183-
if err != nil {
184-
return fmt.Errorf("failed to create Redis client: %w", err)
185-
}
186-
defer func() {
187-
if err := redisClient.Close(); err != nil {
188-
logger.Error("failed to close Redis client", "error", err)
185+
// Create Redis client and idempotency service.
186+
// In production: fail fast if Redis is unavailable (idempotency is critical).
187+
// In non-production: use NoopService for graceful degradation with metrics.
188+
var idempotencyService idempotency.Service
189+
redisClient, redisErr := createRedisClient(logger)
190+
if redisErr != nil {
191+
if env.IsProduction() {
192+
logger.Error("CRITICAL: Redis unavailable in production - failing fast", "error", redisErr)
193+
return bootstrap.Permanent(fmt.Errorf("%w: %w", ErrRedisRequiredInProduction, redisErr))
189194
}
190-
}()
191-
idempotencyService := idempotency.NewRedisService(redisClient)
195+
logger.Warn("Redis not available at startup, using noop idempotency service - DEVELOPMENT ONLY",
196+
"error", redisErr,
197+
"environment", os.Getenv("ENVIRONMENT"))
198+
idempotencyService = idempotency.NewNoopService(logger)
199+
poobservability.SetNoopIdempotencyActive(true)
200+
poobservability.RecordServiceDegradation(poobservability.ComponentIdempotency, poobservability.DegradationReasonStartupFallback)
201+
} else {
202+
idempotencyService = idempotency.NewRedisService(redisClient)
203+
poobservability.SetNoopIdempotencyActive(false)
204+
logger.Info("idempotency service initialized with Redis")
205+
defer func() {
206+
if err := redisClient.Close(); err != nil {
207+
logger.Error("failed to close Redis client", "error", err)
208+
}
209+
}()
210+
}
192211

193212
// Create Starlark handler registry with service client handlers.
194213
// This enables saga scripts to call real services (not mocks).
@@ -297,7 +316,7 @@ func run(logger *slog.Logger) error {
297316
var billingCronScheduler *scheduler.CronScheduler
298317
var dunningWorker *worker.DunningWorker
299318

300-
if svcConfig.BillingEnabled {
319+
if svcConfig.BillingEnabled && redisClient != nil {
301320
billingRepo := persistence.NewBillingRepository(db)
302321
billingMetrics := worker.NewBillingMetrics()
303322

@@ -387,6 +406,8 @@ func run(logger *slog.Logger) error {
387406
"shadow_mode", svcConfig.BillingShadowMode,
388407
"dunning_poll_interval", svcConfig.DunningPollInterval,
389408
"tenant_id", tenantID)
409+
} else if svcConfig.BillingEnabled && redisClient == nil {
410+
logger.Warn("billing workers disabled — Redis unavailable (DEVELOPMENT ONLY)")
390411
} else {
391412
logger.Info("billing workers disabled (BILLING_ENABLED=false)")
392413
}
@@ -456,13 +477,19 @@ func run(logger *slog.Logger) error {
456477
return fmt.Errorf("failed to create webhook handler: %w", err)
457478
}
458479

459-
// Create Stripe event processor for webhook idempotency and dunning
460-
eventProcessor, err := webhookhttp.NewStripeEventProcessor(webhookhttp.StripeEventProcessorConfig{
461-
RedisClient: redisClient,
462-
Logger: logger,
463-
})
464-
if err != nil {
465-
return fmt.Errorf("failed to create stripe event processor: %w", err)
480+
// Create Stripe event processor for webhook idempotency and dunning.
481+
// Requires Redis; if Redis is unavailable in non-production, skip Stripe webhook processing.
482+
var eventProcessor *webhookhttp.StripeEventProcessor
483+
if redisClient != nil {
484+
eventProcessor, err = webhookhttp.NewStripeEventProcessor(webhookhttp.StripeEventProcessorConfig{
485+
RedisClient: redisClient,
486+
Logger: logger,
487+
})
488+
if err != nil {
489+
return fmt.Errorf("failed to create stripe event processor: %w", err)
490+
}
491+
} else {
492+
logger.Warn("Stripe event processor disabled — Redis unavailable (DEVELOPMENT ONLY)")
466493
}
467494

468495
// Create Stripe webhook handler (optional - only if STRIPE_API_KEY is configured)

services/payment-order/observability/metrics.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,56 @@ var (
280280
Help: "Total number of payment orders where lien execution retries were exhausted",
281281
},
282282
)
283+
284+
// NoOp fallback metrics - indicates degraded service functionality
285+
noopIdempotencyActive = promauto.NewGauge(
286+
prometheus.GaugeOpts{
287+
Name: "payment_order_noop_idempotency_active",
288+
Help: "1 if NoOp idempotency service is active (production risk), 0 otherwise",
289+
},
290+
)
291+
292+
serviceDegradationEvents = promauto.NewCounterVec(
293+
prometheus.CounterOpts{
294+
Name: "payment_order_service_degradation_events_total",
295+
Help: "Total number of service degradation events by component",
296+
},
297+
[]string{"component", "reason"},
298+
)
299+
)
300+
301+
// Service component constants for degradation metrics.
302+
const (
303+
ComponentIdempotency = "idempotency"
304+
)
305+
306+
// Degradation reason constants.
307+
const (
308+
DegradationReasonStartupFallback = "startup_fallback"
283309
)
284310

311+
// SetNoopIdempotencyActive sets the gauge indicating whether NoOp idempotency is active.
312+
// This metric MUST trigger a critical alert in production environments.
313+
//
314+
// ALERTING: This metric MUST have a Prometheus alert configured:
315+
//
316+
// alert: NoopIdempotencyActiveInProduction
317+
// expr: payment_order_noop_idempotency_active == 1 AND environment == "production"
318+
// severity: critical
319+
// runbook: docs/runbooks/noop-fallback-active.md
320+
func SetNoopIdempotencyActive(active bool) {
321+
if active {
322+
noopIdempotencyActive.Set(1)
323+
} else {
324+
noopIdempotencyActive.Set(0)
325+
}
326+
}
327+
328+
// RecordServiceDegradation records a service degradation event.
329+
func RecordServiceDegradation(component, reason string) {
330+
serviceDegradationEvents.WithLabelValues(component, reason).Inc()
331+
}
332+
285333
// RecordPaymentOrder records a payment order by status.
286334
func RecordPaymentOrder(status string) {
287335
paymentOrdersTotal.WithLabelValues(status).Inc()

0 commit comments

Comments
 (0)