Skip to content

Commit 76d6c0c

Browse files
authored
feat: migrate Payment Order, Market Info, Reconciliation to OutboxPublisher (#1312)
* feat: add canonical financial-accounting topic and dual-publish Add FinancialAccountingBookingLogControlledV1 constant following the standard <service>.<event-name>.<version> naming convention. The legacy FinancialAccountingBookingLogControlled topic is deprecated but retained for dual-publishing during migration. Both topics are written to the outbox within the same transaction for backwards compatibility. * feat: migrate market-information to OutboxPublisher Replace KafkaObservationPublisher with OutboxEventPublisher that writes events to the transactional outbox table via GORM. Add outbox worker and Kafka producer wiring to cmd/main.go. The existing pgxpool connection is retained for domain operations while GORM handles the outbox pattern. Deprecated dual-publishing to the old topic name is removed since the outbox publisher uses the canonical topic. * feat: migrate reconciliation service to OutboxPublisher Replace custom KafkaPublisher (JSON-based, dual-publishing) with the shared OutboxPublisher pattern. Events are now written to the outbox table within a GORM transaction and published to Kafka asynchronously by the outbox worker. - Create OutboxEventPublisher adapter with interface-based event routing to avoid import cycles between messaging and service packages - Add getter methods to DisputeCreatedEvent, DisputeResolvedEvent, and PositionLockRequestedEvent for interface compliance - Wire outbox worker and Kafka producer in cmd/main.go - Remove deprecated dual-publishing to legacy topic names * feat: migrate payment-order service to OutboxPublisher Replace direct Kafka producer with the shared OutboxPublisher pattern. Events are now written to the outbox table within a GORM transaction and published to Kafka asynchronously by the outbox worker. - Create OutboxPublisher adapter implementing service.KafkaPublisher interface with topic-to-event-type mapping for all 7 payment order lifecycle events (Initiated, Reserved, Executing, Completed, Failed, Cancelled, Reversed) - Wire outbox worker and Kafka producer in cmd/main.go - Remove legacy createKafkaProducer function - Add graceful shutdown for outbox worker and Kafka producer * fix: address review feedback on outbox cleanup and unused code - Add defer for kafkaProducer.Close() to prevent leaks on early return in all three services (market-info, payment-order, reconciliation) - Remove duplicate outbox worker/producer cleanup from orchestrator and explicit shutdown sections (defer handles all paths) - Remove unused period timestamp parsing in reconciliation outbox publisher (proto message does not carry period fields) - Improve warning message when outbox worker is disabled * fix: correct outbox DSN config and defer ordering - Set gormDBConfig.DSN = dbURL in market-information so outbox uses the same database as domain persistence instead of the hardcoded fallback - Move defer bootstrap.CloseDatabase earlier in payment-order so DB closes after outbox worker stops (LIFO ordering) - Remove stale createKafkaProducer doc comment * fix: update topic tests for canonical V1 constant and deprecated exclusion - Update TestAll_ContainsAllConstants to expect the new canonical FinancialAccountingBookingLogControlledV1 constant instead of the deprecated legacy constant - Update TestTopicsYAML_ConsistentWithGoConstants to skip deprecated topics when checking YAML-to-Go consistency, matching All()'s documented behavior of excluding deprecated topics - Add Deprecated field to topicEntry YAML struct for filtering --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent e7d4446 commit 76d6c0c

12 files changed

Lines changed: 528 additions & 45 deletions

File tree

services/financial-accounting/service/grpc_control_endpoints.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,20 +195,36 @@ func (s *FinancialAccountingService) ControlFinancialBookingLog(
195195
Version: 1,
196196
}
197197

198-
eventTopic := topics.FinancialAccountingBookingLogControlled
198+
// Publish to canonical v1 topic
199199
if err := s.outboxPublisher.PublishControlEvent(
200200
ctx,
201201
tx,
202202
controlEvent,
203203
"financial_accounting.booking_log_controlled.v1",
204204
bookingLogID.String(),
205205
"FinancialBookingLog",
206-
eventTopic,
206+
topics.FinancialAccountingBookingLogControlledV1,
207207
correlationID,
208208
); err != nil {
209209
return fmt.Errorf("failed to write event to outbox: %w", err)
210210
}
211211

212+
// Dual-publish to legacy topic for backwards compatibility during migration.
213+
//nolint:staticcheck // SA1019: intentional use of deprecated topic for dual-publish
214+
legacyTopic := topics.FinancialAccountingBookingLogControlled
215+
if err := s.outboxPublisher.PublishControlEvent(
216+
ctx,
217+
tx,
218+
controlEvent,
219+
"financial_accounting.booking_log_controlled.v1",
220+
bookingLogID.String(),
221+
"FinancialBookingLog",
222+
legacyTopic,
223+
correlationID,
224+
); err != nil {
225+
return fmt.Errorf("failed to write legacy event to outbox: %w", err)
226+
}
227+
212228
return nil
213229
})
214230
if err != nil {

services/market-information/cmd/main.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/meridianhub/meridian/shared/platform/bootstrap"
2424
"github.com/meridianhub/meridian/shared/platform/defaults"
2525
"github.com/meridianhub/meridian/shared/platform/env"
26+
"github.com/meridianhub/meridian/shared/platform/events"
27+
"github.com/meridianhub/meridian/shared/platform/kafka"
2628
"github.com/meridianhub/meridian/shared/platform/ports"
2729
"github.com/prometheus/client_golang/prometheus/promhttp"
2830
"google.golang.org/grpc/health/grpc_health_v1"
@@ -92,6 +94,52 @@ func run(logger *slog.Logger) error {
9294
}
9395
logger.Info("database connection established", "url", dbURL)
9496

97+
// Initialize GORM database connection for outbox pattern.
98+
// The existing pgxpool connection is retained for domain persistence operations.
99+
gormDBConfig := bootstrap.DefaultDatabaseConfig()
100+
gormDBConfig.DSN = dbURL
101+
gormDBConfig.Logger = logger
102+
gormDB, err := bootstrap.NewDatabase(ctx, gormDBConfig)
103+
if err != nil {
104+
return fmt.Errorf("failed to initialize GORM database for outbox: %w", err)
105+
}
106+
defer bootstrap.CloseDatabase(gormDB, logger)
107+
108+
// Initialize outbox publisher and worker for transactional event publishing
109+
outboxRepo := events.NewPostgresOutboxRepository(gormDB)
110+
outboxPublisher := events.NewOutboxPublisher("market-information")
111+
112+
var outboxWorker *events.Worker
113+
var kafkaProducer *kafka.ProtoProducer
114+
bootstrapServers := env.GetEnvOrDefault("KAFKA_BOOTSTRAP_SERVERS", "")
115+
if bootstrapServers != "" {
116+
producer, kafkaErr := kafka.NewProtoProducer(kafka.ProducerConfig{
117+
BootstrapServers: bootstrapServers,
118+
ClientID: "market-information-outbox-worker",
119+
Acks: "all",
120+
Retries: 3,
121+
Compression: "snappy",
122+
})
123+
if kafkaErr != nil {
124+
logger.Warn("failed to create Kafka producer for outbox worker",
125+
"error", kafkaErr)
126+
} else {
127+
kafkaProducer = producer
128+
defer kafkaProducer.Close()
129+
workerConfig := events.DefaultWorkerConfig("market-information")
130+
outboxWorker = events.NewWorker(outboxRepo, kafkaProducer, workerConfig, logger)
131+
outboxWorker.Start(ctx)
132+
defer outboxWorker.Stop()
133+
logger.Info("outbox worker started",
134+
"bootstrap_servers", bootstrapServers)
135+
}
136+
} else {
137+
logger.Warn("outbox worker disabled - KAFKA_BOOTSTRAP_SERVERS not set")
138+
}
139+
140+
// Create outbox-based event publisher (replaces KafkaObservationPublisher)
141+
outboxEventPublisher := service.NewOutboxEventPublisher(gormDB, outboxPublisher)
142+
95143
// Create repositories for persistence
96144
masterTenantID := env.GetEnvOrDefault("MASTER_TENANT_ID", "meridian_master")
97145
repos := persistence.NewRepositories(dbPool, masterTenantID)
@@ -102,6 +150,7 @@ func run(logger *slog.Logger) error {
102150
repos.DataSet,
103151
repos.Observation,
104152
repos.Source,
153+
service.WithEventPublisher(outboxEventPublisher),
105154
service.WithLogger(logger.With("component", "market_information_server")),
106155
)
107156
if err != nil {
@@ -224,6 +273,8 @@ func run(logger *slog.Logger) error {
224273
// Wait for shutdown signal and orchestrate graceful shutdown
225274
orchestrator := bootstrap.NewShutdownOrchestrator(grpcServer, logger)
226275

276+
// Outbox worker and Kafka producer are cleaned up via defer.
277+
227278
// Initialize ECB adapter worker (if enabled)
228279
// Note: The ECB worker calls RecordObservation on the Server to ingest FX rates.
229280
if cfg.ECB.Enabled {
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Package service provides application services for the Market Information service.
2+
package service
3+
4+
import (
5+
"context"
6+
"fmt"
7+
8+
marketinformationv1 "github.com/meridianhub/meridian/api/proto/meridian/market_information/v1"
9+
"github.com/meridianhub/meridian/services/market-information/domain"
10+
"github.com/meridianhub/meridian/shared/platform/events"
11+
"github.com/meridianhub/meridian/shared/platform/events/topics"
12+
"gorm.io/gorm"
13+
)
14+
15+
// OutboxEventPublisher publishes observation domain events through the transactional outbox pattern.
16+
// It implements both EventPublisher and ObservationEventPublisher interfaces.
17+
//
18+
// Since market-information uses pgxpool for domain persistence (not GORM), outbox entries are
19+
// written in a separate GORM transaction. This provides at-least-once delivery via the outbox
20+
// worker, replacing the previous fire-and-forget Kafka publishing.
21+
type OutboxEventPublisher struct {
22+
db *gorm.DB
23+
publisher *events.OutboxPublisher
24+
}
25+
26+
// NewOutboxEventPublisher creates a new outbox-based event publisher.
27+
func NewOutboxEventPublisher(db *gorm.DB, publisher *events.OutboxPublisher) *OutboxEventPublisher {
28+
return &OutboxEventPublisher{
29+
db: db,
30+
publisher: publisher,
31+
}
32+
}
33+
34+
// PublishObservationRecorded publishes an ObservationRecorded event through the outbox.
35+
func (p *OutboxEventPublisher) PublishObservationRecorded(
36+
ctx context.Context,
37+
observation domain.MarketPriceObservation,
38+
) error {
39+
event := mapObservationToProtoEvent(observation)
40+
partitionKey := observation.DataSetCode()
41+
42+
return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
43+
return p.publisher.Publish(ctx, tx, event, events.PublishConfig{
44+
EventType: "market_information.observation_recorded.v1",
45+
Topic: topics.MarketInformationObservationRecordedV1,
46+
AggregateType: "MarketPriceObservation",
47+
AggregateID: observation.ID().String(),
48+
PartitionKey: partitionKey,
49+
})
50+
})
51+
}
52+
53+
// Publish implements the EventPublisher interface by type-switching on the event.
54+
func (p *OutboxEventPublisher) Publish(ctx context.Context, event any) error {
55+
switch e := event.(type) {
56+
case *marketinformationv1.ObservationRecorded:
57+
partitionKey := e.DatasetCode
58+
return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
59+
return p.publisher.Publish(ctx, tx, e, events.PublishConfig{
60+
EventType: "market_information.observation_recorded.v1",
61+
Topic: topics.MarketInformationObservationRecordedV1,
62+
AggregateType: "MarketPriceObservation",
63+
AggregateID: e.ObservationId,
64+
PartitionKey: partitionKey,
65+
})
66+
})
67+
default:
68+
return fmt.Errorf("%w: %T", ErrUnsupportedEventType, event)
69+
}
70+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Package messaging provides event publishing for the payment order service.
2+
package messaging
3+
4+
import (
5+
"context"
6+
"errors"
7+
"fmt"
8+
9+
"github.com/meridianhub/meridian/shared/platform/events"
10+
"github.com/meridianhub/meridian/shared/platform/events/topics"
11+
"google.golang.org/protobuf/proto"
12+
"gorm.io/gorm"
13+
)
14+
15+
// errUnsupportedTopic is returned when a topic has no mapping in the outbox publisher.
16+
var errUnsupportedTopic = errors.New("unsupported topic for outbox publishing")
17+
18+
// topicToEventType maps Kafka topic constants to outbox event type strings.
19+
var topicToEventType = map[string]string{
20+
topics.PaymentOrderInitiatedV1: "payment_order.initiated.v1",
21+
topics.PaymentOrderReservedV1: "payment_order.reserved.v1",
22+
topics.PaymentOrderExecutingV1: "payment_order.executing.v1",
23+
topics.PaymentOrderCompletedV1: "payment_order.completed.v1",
24+
topics.PaymentOrderFailedV1: "payment_order.failed.v1",
25+
topics.PaymentOrderCancelledV1: "payment_order.cancelled.v1",
26+
topics.PaymentOrderReversedV1: "payment_order.reversed.v1",
27+
}
28+
29+
// OutboxPublisher implements the service.KafkaPublisher interface by writing proto events
30+
// to the transactional outbox table instead of publishing directly to Kafka.
31+
// The outbox worker handles reliable delivery to Kafka asynchronously.
32+
type OutboxPublisher struct {
33+
db *gorm.DB
34+
publisher *events.OutboxPublisher
35+
}
36+
37+
// NewOutboxPublisher creates a new outbox-based event publisher for payment orders.
38+
func NewOutboxPublisher(db *gorm.DB, publisher *events.OutboxPublisher) *OutboxPublisher {
39+
return &OutboxPublisher{
40+
db: db,
41+
publisher: publisher,
42+
}
43+
}
44+
45+
// Publish implements service.KafkaPublisher by writing the proto event to the outbox table.
46+
// The key parameter is used as both the aggregate ID and partition key (payment order ID).
47+
func (p *OutboxPublisher) Publish(ctx context.Context, topic string, key string, msg proto.Message) error {
48+
eventType, ok := topicToEventType[topic]
49+
if !ok {
50+
return fmt.Errorf("%w: %s", errUnsupportedTopic, topic)
51+
}
52+
53+
return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
54+
return p.publisher.Publish(ctx, tx, msg, events.PublishConfig{
55+
EventType: eventType,
56+
Topic: topic,
57+
AggregateType: "PaymentOrder",
58+
AggregateID: key,
59+
PartitionKey: key,
60+
})
61+
})
62+
}

services/payment-order/cmd/main.go

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/meridianhub/meridian/services/payment-order/adapters/gateway"
2222
stripegateway "github.com/meridianhub/meridian/services/payment-order/adapters/gateway/stripe"
2323
webhookhttp "github.com/meridianhub/meridian/services/payment-order/adapters/http"
24+
pomessaging "github.com/meridianhub/meridian/services/payment-order/adapters/messaging"
2425
"github.com/meridianhub/meridian/services/payment-order/adapters/persistence"
2526
"github.com/meridianhub/meridian/services/payment-order/config"
2627
"github.com/meridianhub/meridian/services/payment-order/domain"
@@ -32,6 +33,7 @@ import (
3233
"github.com/meridianhub/meridian/shared/platform/bootstrap"
3334
"github.com/meridianhub/meridian/shared/platform/defaults"
3435
"github.com/meridianhub/meridian/shared/platform/env"
36+
"github.com/meridianhub/meridian/shared/platform/events"
3537
"github.com/meridianhub/meridian/shared/platform/kafka"
3638
"github.com/meridianhub/meridian/shared/platform/observability"
3739
"github.com/meridianhub/meridian/shared/platform/ports"
@@ -124,6 +126,7 @@ func run(logger *slog.Logger) error {
124126
if err != nil {
125127
return fmt.Errorf("failed to initialize database: %w", err)
126128
}
129+
defer bootstrap.CloseDatabase(db, logger)
127130

128131
logger.Info("database connection established")
129132

@@ -175,12 +178,46 @@ func run(logger *slog.Logger) error {
175178
return fmt.Errorf("failed to load gateway account config: %w", err)
176179
}
177180

178-
// Create Kafka producer
179-
kafkaProducer, err := createKafkaProducer(logger)
180-
if err != nil {
181-
return fmt.Errorf("failed to create Kafka producer: %w", err)
181+
// Initialize outbox publisher and worker for transactional event publishing.
182+
// Events are written to the outbox table within the same DB transaction as
183+
// domain operations, then published to Kafka asynchronously by the outbox worker.
184+
outboxRepo := events.NewPostgresOutboxRepository(db)
185+
outboxPublisher := events.NewOutboxPublisher("payment-order")
186+
187+
var outboxWorker *events.Worker
188+
var kafkaProducer *kafka.ProtoProducer
189+
bootstrapServers := env.GetEnvOrDefault("KAFKA_BOOTSTRAP_SERVERS", "")
190+
if bootstrapServers == "" {
191+
// Fall back to legacy KAFKA_BROKERS env var
192+
bootstrapServers = env.GetEnvOrDefault("KAFKA_BROKERS", "")
193+
}
194+
if bootstrapServers != "" {
195+
producer, kafkaErr := kafka.NewProtoProducer(kafka.ProducerConfig{
196+
BootstrapServers: bootstrapServers,
197+
ClientID: "payment-order-outbox-worker",
198+
Acks: "all",
199+
Retries: 3,
200+
Compression: "snappy",
201+
})
202+
if kafkaErr != nil {
203+
logger.Warn("failed to create Kafka producer for outbox worker",
204+
"error", kafkaErr)
205+
} else {
206+
kafkaProducer = producer
207+
defer kafkaProducer.Close()
208+
workerConfig := events.DefaultWorkerConfig("payment-order")
209+
outboxWorker = events.NewWorker(outboxRepo, kafkaProducer, workerConfig, logger)
210+
outboxWorker.Start(ctx)
211+
defer outboxWorker.Stop()
212+
logger.Info("outbox worker started",
213+
"bootstrap_servers", bootstrapServers)
214+
}
215+
} else {
216+
logger.Warn("outbox worker disabled - KAFKA_BOOTSTRAP_SERVERS not set (events will accumulate in outbox)")
182217
}
183-
defer kafkaProducer.Close()
218+
219+
// Create outbox-based event publisher (replaces direct Kafka producer)
220+
eventPublisher := pomessaging.NewOutboxPublisher(db, outboxPublisher)
184221

185222
// Create Redis client and idempotency service.
186223
// In production: fail fast if Redis is unavailable (idempotency is critical).
@@ -427,7 +464,7 @@ func run(logger *slog.Logger) error {
427464
ReferenceDataClient: referenceDataClient, // May be nil if reference-data unavailable
428465
PaymentGateway: paymentGateway,
429466
GatewayAccountConfig: gatewayAccountConfig,
430-
KafkaPublisher: kafkaProducer,
467+
KafkaPublisher: eventPublisher,
431468
IdempotencyService: idempotencyService,
432469
Logger: logger,
433470
Tracer: tracer,
@@ -592,9 +629,6 @@ func run(logger *slog.Logger) error {
592629
shutdownCtx, cancel := context.WithTimeout(context.Background(), defaults.DefaultGracefulShutdown)
593630
defer cancel()
594631

595-
// Close database connection during shutdown
596-
defer bootstrap.CloseDatabase(db, logger)
597-
598632
// Stop billing workers and wait for goroutines to exit before database close.
599633
// Cancel the worker context first to unblock Start() select loops, then
600634
// call Stop() to signal internal shutdown channels and drain in-flight work.
@@ -877,16 +911,6 @@ func createPaymentGateway(svcConfig config.ServiceConfig, logger *slog.Logger) (
877911
return gateway.NewResilientPaymentGateway(baseGateway, resilientConfig), nil
878912
}
879913

880-
// createKafkaProducer creates the Kafka producer.
881-
func createKafkaProducer(logger *slog.Logger) (*kafka.ProtoProducer, error) {
882-
brokers := env.GetEnvOrDefault("KAFKA_BROKERS", "kafka:9092")
883-
logger.Info("connecting to Kafka", "brokers", brokers)
884-
return kafka.NewProtoProducer(kafka.ProducerConfig{
885-
BootstrapServers: brokers,
886-
ClientID: "payment-order-service",
887-
})
888-
}
889-
890914
// createRedisClient creates and validates a Redis client connection.
891915
// Environment variables:
892916
// - REDIS_URL: Redis connection URL (default: redis://localhost:6379)

0 commit comments

Comments
 (0)