Skip to content

Commit ab29794

Browse files
authored
feat(position-keeping): integrate event outbox worker for transactional event publishing (#420)
1 parent 72606a7 commit ab29794

4 files changed

Lines changed: 98 additions & 2 deletions

File tree

services/position-keeping/app/container.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/meridianhub/meridian/services/position-keeping/domain"
1313
"github.com/meridianhub/meridian/shared/platform/audit"
1414
"github.com/meridianhub/meridian/shared/platform/auth"
15+
"github.com/meridianhub/meridian/shared/platform/events"
1516
"github.com/meridianhub/meridian/shared/platform/kafka"
1617
"github.com/meridianhub/meridian/shared/platform/observability"
1718
"github.com/redis/go-redis/v9"
@@ -35,6 +36,9 @@ type Container struct {
3536

3637
// Repository
3738
PositionLogRepository domain.FinancialPositionLogRepository
39+
40+
// Event Outbox
41+
OutboxRepository *events.PgxOutboxRepository
3842
}
3943

4044
// NewContainer creates and initializes a new dependency injection container
@@ -67,6 +71,8 @@ func NewContainer(ctx context.Context, config *Config, logger *slog.Logger) (*Co
6771

6872
container.initializeRepositories()
6973

74+
container.initializeOutboxRepository()
75+
7076
logger.Info("dependency container initialized successfully")
7177

7278
return container, nil
@@ -310,6 +316,24 @@ func (c *Container) initializeRepositories() {
310316
c.Logger.Info("repositories initialized")
311317
}
312318

319+
// initializeOutboxRepository initializes the event outbox repository for transactional publishing.
320+
// The repository is always initialized regardless of Kafka availability because:
321+
// 1. Domain services use it transactionally to persist events alongside state changes
322+
// 2. When Kafka is disabled, events remain in the outbox until Kafka becomes available
323+
// 3. This enables graceful degradation - the system continues operating without message broker
324+
func (c *Container) initializeOutboxRepository() {
325+
// TODO(tm:bian-alignment.14): Consider exposing outbox depth as a health check metric
326+
// to alert operators when the outbox is backing up (e.g., Kafka unavailable).
327+
c.OutboxRepository = events.NewPgxOutboxRepository(c.DBPool)
328+
c.Logger.Info("event outbox repository initialized")
329+
}
330+
331+
// KafkaProducer returns the Kafka producer for use by components that need
332+
// direct Kafka access (e.g., the event outbox worker). Returns nil if Kafka is disabled.
333+
func (c *Container) KafkaProducer() *kafka.ProtoProducer {
334+
return c.kafkaProducer
335+
}
336+
313337
// Close gracefully closes all resources in the container
314338
func (c *Container) Close(ctx context.Context) error {
315339
c.Logger.Info("closing container resources...")

services/position-keeping/cmd/main.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/meridianhub/meridian/shared/pkg/idempotency"
2222
"github.com/meridianhub/meridian/shared/pkg/interceptors"
2323
"github.com/meridianhub/meridian/shared/platform/auth"
24+
"github.com/meridianhub/meridian/shared/platform/events"
2425
"github.com/prometheus/client_golang/prometheus"
2526
"github.com/prometheus/client_golang/prometheus/promhttp"
2627
"google.golang.org/grpc"
@@ -121,6 +122,29 @@ func run(logger *slog.Logger) error {
121122

122123
logger.Info("dependency container initialized")
123124

125+
// Initialize and start event outbox worker (if Kafka enabled)
126+
// TODO(tm:bian-alignment.14): Make worker config values (batch_size, poll_interval, max_retries)
127+
// configurable via environment variables for production tuning.
128+
var outboxWorker *events.Worker
129+
var workerCancel context.CancelFunc
130+
if container.KafkaProducer() != nil {
131+
workerConfig := events.DefaultWorkerConfig("position-keeping")
132+
outboxWorker = events.NewWorker(
133+
container.OutboxRepository,
134+
container.KafkaProducer(),
135+
workerConfig,
136+
logger,
137+
)
138+
139+
// Start worker in background
140+
var workerCtx context.Context
141+
workerCtx, workerCancel = context.WithCancel(context.Background())
142+
defer workerCancel() // Safety net; primary shutdown goes through explicit cancellation
143+
outboxWorker.Start(workerCtx)
144+
} else {
145+
logger.Info("event outbox worker disabled (kafka not configured)")
146+
}
147+
124148
// Create idempotency service
125149
var idempotencySvc idempotency.Service
126150
if container.RedisClient != nil {
@@ -274,6 +298,18 @@ func run(logger *slog.Logger) error {
274298
// Graceful shutdown
275299
logger.Info("shutting down servers...")
276300

301+
// Shutdown outbox worker before stopping servers
302+
// TODO(tm:bian-alignment.14): Add a shutdown timeout mechanism to prevent indefinite blocking
303+
// if the worker fails to stop gracefully (e.g., Kafka broker unreachable).
304+
if outboxWorker != nil {
305+
logger.Info("stopping event outbox worker...")
306+
if workerCancel != nil {
307+
workerCancel() // Cancel context first to signal worker to stop accepting new work
308+
}
309+
outboxWorker.Stop() // Blocks until current batch completes and Kafka flush finishes
310+
logger.Info("event outbox worker stopped")
311+
}
312+
277313
// Create shutdown context with timeout
278314
shutdownCtx, cancel := context.WithTimeout(context.Background(), config.Server.GracefulShutdownTimeout)
279315
defer cancel()

shared/platform/events/worker.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
12+
"github.com/google/uuid"
1213
)
1314

1415
// Worker errors.
@@ -49,6 +50,26 @@ type KafkaPublisher interface {
4950
Close()
5051
}
5152

53+
// WorkerRepository defines the interface for repository operations used by the Worker.
54+
// This is a subset of OutboxRepository that excludes Insert, allowing both GORM-based
55+
// (PostgresOutboxRepository) and pgx-based (PgxOutboxRepository) implementations to be used.
56+
type WorkerRepository interface {
57+
// FetchAndLockForProcessing atomically fetches pending entries and marks them as processing.
58+
FetchAndLockForProcessing(ctx context.Context, serviceName string, limit int) ([]EventOutbox, error)
59+
60+
// MarkCompleted marks an entry as successfully processed.
61+
MarkCompleted(ctx context.Context, id uuid.UUID) error
62+
63+
// MarkFailed increments retry count and updates error message.
64+
MarkFailed(ctx context.Context, id uuid.UUID, err error, maxRetries int) error
65+
66+
// GetPendingCount returns the number of pending entries for observability.
67+
GetPendingCount(ctx context.Context, serviceName string) (int64, error)
68+
69+
// ResetStuckEntries resets entries stuck in 'processing' state for too long.
70+
ResetStuckEntries(ctx context.Context, serviceName string, olderThan time.Duration) (int64, error)
71+
}
72+
5273
// WorkerConfig contains configuration for the event outbox worker.
5374
type WorkerConfig struct {
5475
// ServiceName identifies this service for filtering outbox entries.
@@ -90,7 +111,7 @@ func DefaultWorkerConfig(serviceName string) WorkerConfig {
90111
// Worker is a background processor that publishes events from the outbox to Kafka.
91112
// It implements graceful shutdown and handles retries with exponential backoff.
92113
type Worker struct {
93-
repository OutboxRepository
114+
repository WorkerRepository
94115
publisher KafkaPublisher
95116
config WorkerConfig
96117
logger *slog.Logger
@@ -109,7 +130,7 @@ type Worker struct {
109130
//
110131
// Returns a configured Worker ready to start processing.
111132
func NewWorker(
112-
repository OutboxRepository,
133+
repository WorkerRepository,
113134
publisher KafkaPublisher,
114135
config WorkerConfig,
115136
logger *slog.Logger,

shared/platform/kafka/producer.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,21 @@ func (p *ProtoProducer) Close() {
170170
p.producer.Close()
171171
}
172172

173+
// Produce sends a raw Kafka message asynchronously with delivery report.
174+
// This is a low-level method that exposes the underlying Kafka producer's Produce method,
175+
// allowing callers to receive delivery confirmations via the deliveryChan.
176+
// This method is used by the event outbox worker to publish pre-serialized event payloads.
177+
//
178+
// Parameters:
179+
// - msg: Pre-built Kafka message with topic, key, value, and optional headers
180+
// - deliveryChan: Channel to receive delivery confirmation events
181+
//
182+
// Returns an error if the message cannot be enqueued for delivery.
183+
// Note: A nil return does not guarantee delivery - check the deliveryChan for confirmation.
184+
func (p *ProtoProducer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
185+
return p.producer.Produce(msg, deliveryChan)
186+
}
187+
173188
// PublishWithTenant sends a protobuf message with tenant context to the specified Kafka topic.
174189
// The tenant ID is extracted from the context and injected as a Kafka header (x-tenant-id).
175190
// This ensures tenant isolation for multi-tenant event processing.

0 commit comments

Comments
 (0)