Skip to content

Commit fa860fd

Browse files
authored
feat: wire outbox worker in financial-accounting service (#422)
* feat: wire outbox worker in financial-accounting service Integrate the transactional outbox pattern worker into the financial-accounting service for reliable event publishing with at-least-once delivery guarantees. Changes: - Add ControlAction type and ControlLog domain method for SUSPEND/RESUME/TERMINATE lifecycle operations following BIAN CoCR patterns - Add WithTransaction and DB methods to LedgerRepository for atomic outbox writes - Add protobuf definitions for ControlFinancialBookingLog RPC and events - Wire OutboxRepository and Worker in main.go with Kafka producer - Implement ControlFinancialBookingLog gRPC method with idempotency support - Add comprehensive tests for control operations and state machine logic The outbox worker polls for pending events and publishes to Kafka, with configurable batch size and poll interval. Events are written atomically with domain state changes in a single transaction. * fix: address CodeRabbit review feedback Addresses three critical issues identified by CodeRabbit: 1. Kafka producer flush: Call Flush() with 5s timeout before Close() to ensure all pending outbox events are delivered before shutdown. 2. Race condition fix: Eliminate double-fetch by performing all operations within a single transaction with pessimistic locking (SELECT FOR UPDATE). Previously fetched entity outside transaction for domain logic, then again inside transaction, creating window for concurrent modifications. Now fetch-lock-apply-save happens atomically. 3. Layer separation: Service layer reconstructs domain model from locked entity, applies domain logic, then updates entity - maintaining proper separation while avoiding race conditions. The refactored ControlBookingLog method now: - Acquires pessimistic lock on entity within transaction - Reconstructs domain model from locked entity - Applies domain control logic - Updates locked entity with domain results - Writes event to outbox atomically - Maps all domain errors to gRPC codes after transaction --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 6379be9 commit fa860fd

12 files changed

Lines changed: 1201 additions & 61 deletions

api/proto/meridian/events/v1/financial_accounting_events.proto

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,65 @@ message LedgerPostingRejectedEvent {
452452
int64 version = 8 [(buf.validate.field).int64 = {gte: 1}];
453453
}
454454

455+
// FinancialBookingLogControlledEvent represents a control action applied to
456+
// a financial booking log (SUSPEND, RESUME, TERMINATE). Published when
457+
// administrative control operations are performed.
458+
message FinancialBookingLogControlledEvent {
459+
// booking_log_id uniquely identifies the financial booking log
460+
string booking_log_id = 1 [(buf.validate.field).string = {
461+
min_len: 1
462+
max_len: 255
463+
}];
464+
465+
// control_action is the action that was performed (SUSPEND, RESUME, TERMINATE)
466+
string control_action = 2 [(buf.validate.field).string = {
467+
min_len: 1
468+
max_len: 50
469+
}];
470+
471+
// previous_status is the status before the control action
472+
meridian.common.v1.TransactionStatus previous_status = 3 [(buf.validate.field).enum = {
473+
defined_only: true
474+
not_in: [0]
475+
}];
476+
477+
// new_status is the status after the control action
478+
meridian.common.v1.TransactionStatus new_status = 4 [(buf.validate.field).enum = {
479+
defined_only: true
480+
not_in: [0]
481+
}];
482+
483+
// reason explains why the control action was performed
484+
string reason = 5 [(buf.validate.field).string = {
485+
min_len: 1
486+
max_len: 500
487+
}];
488+
489+
// controlled_by identifies who performed the control action
490+
string controlled_by = 6 [(buf.validate.field).string = {
491+
min_len: 1
492+
max_len: 100
493+
}];
494+
495+
// correlation_id links related events across services
496+
string correlation_id = 7 [(buf.validate.field).string = {
497+
min_len: 1
498+
max_len: 255
499+
}];
500+
501+
// causation_id identifies the event or command that caused this event
502+
string causation_id = 8 [(buf.validate.field).string = {
503+
min_len: 1
504+
max_len: 255
505+
}];
506+
507+
// timestamp when the event was created
508+
google.protobuf.Timestamp timestamp = 9 [(buf.validate.field).required = true];
509+
510+
// version is the aggregate version for optimistic locking
511+
int64 version = 10 [(buf.validate.field).int64 = {gte: 1}];
512+
}
513+
455514
// BalanceValidationFailedEvent represents a booking log that failed
456515
// double-entry balance validation (debits != credits). Published when
457516
// attempting to post an unbalanced booking log.

api/proto/meridian/financial_accounting/v1/financial_accounting.proto

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,19 @@ message FinancialBookingLog {
104104
repeated LedgerPosting postings = 10;
105105
}
106106

107+
// ControlAction defines the lifecycle control actions for financial booking logs.
108+
// These are BIAN control operations for administrative management of booking logs.
109+
enum ControlAction {
110+
// CONTROL_ACTION_UNSPECIFIED means the action is unknown.
111+
CONTROL_ACTION_UNSPECIFIED = 0;
112+
// CONTROL_ACTION_SUSPEND temporarily suspends the booking log (transitions to FAILED/suspended).
113+
CONTROL_ACTION_SUSPEND = 1;
114+
// CONTROL_ACTION_RESUME reactivates a suspended booking log (returns to PENDING).
115+
CONTROL_ACTION_RESUME = 2;
116+
// CONTROL_ACTION_TERMINATE permanently ends the booking log lifecycle (transitions to CANCELLED).
117+
CONTROL_ACTION_TERMINATE = 3;
118+
}
119+
107120
// LedgerPosting represents a single posting operation in double-entry bookkeeping.
108121
message LedgerPosting {
109122
// id is the unique identifier for this posting.
@@ -429,6 +442,44 @@ message ListLedgerPostingsResponse {
429442
meridian.common.v1.PaginationResponse pagination = 2;
430443
}
431444

445+
// ControlFinancialBookingLogRequest controls the lifecycle of a booking log (BIAN CoCR).
446+
//
447+
// This operation supports administrative control actions:
448+
// - SUSPEND: Temporarily suspend processing of the booking log
449+
// - RESUME: Resume processing of a previously suspended booking log
450+
// - TERMINATE: Permanently terminate the booking log lifecycle
451+
//
452+
// The operation uses the transactional outbox pattern to ensure atomic
453+
// persistence of both the state change and the associated event.
454+
message ControlFinancialBookingLogRequest {
455+
// id is the booking log to control.
456+
string id = 1 [(buf.validate.field).string = {
457+
min_len: 1
458+
max_len: 255
459+
}];
460+
461+
// control_action is the lifecycle action to perform.
462+
ControlAction control_action = 2 [(buf.validate.field).enum = {
463+
defined_only: true
464+
not_in: [0] // Prevent UNSPECIFIED
465+
}];
466+
467+
// reason is an explanation for the control action (required for audit trail).
468+
string reason = 3 [(buf.validate.field).string = {
469+
min_len: 1
470+
max_len: 500
471+
}];
472+
473+
// idempotency_key ensures exactly-once processing.
474+
meridian.common.v1.IdempotencyKey idempotency_key = 4 [(buf.validate.field).required = true];
475+
}
476+
477+
// ControlFinancialBookingLogResponse returns the controlled booking log.
478+
message ControlFinancialBookingLogResponse {
479+
// financial_booking_log is the booking log after the control action.
480+
FinancialBookingLog financial_booking_log = 1 [(buf.validate.field).required = true];
481+
}
482+
432483
// FinancialAccountingService provides the BIAN Financial Accounting service interface.
433484
service FinancialAccountingService {
434485
// InitiateFinancialBookingLog creates a new financial booking log.
@@ -506,4 +557,14 @@ service FinancialAccountingService {
506557
get: "/v1/postings"
507558
};
508559
}
560+
561+
// ControlFinancialBookingLog controls the lifecycle of a booking log (BIAN CoCR).
562+
// Supports SUSPEND, RESUME, and TERMINATE control actions.
563+
// Uses transactional outbox pattern for reliable event publishing.
564+
rpc ControlFinancialBookingLog(ControlFinancialBookingLogRequest) returns (ControlFinancialBookingLogResponse) {
565+
option (google.api.http) = {
566+
post: "/v1/booking-logs/{id}/control"
567+
body: "*"
568+
};
569+
}
509570
}

services/financial-accounting/adapters/persistence/repository.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,3 +666,29 @@ func (r *LedgerRepository) ListPostings(ctx context.Context, params ListPostings
666666
}
667667
return result, nil
668668
}
669+
670+
// WithTransaction executes a function within a database transaction with tenant scoping.
671+
// This is used for implementing the transactional outbox pattern where both the entity
672+
// update and the outbox event write must succeed or fail together atomically.
673+
//
674+
// The provided function receives a tenant-scoped *gorm.DB transaction that can be used
675+
// for all database operations within the transaction.
676+
//
677+
// Example:
678+
//
679+
// err := repo.WithTransaction(ctx, func(tx *gorm.DB) error {
680+
// if err := tx.Save(&entity).Error; err != nil {
681+
// return err
682+
// }
683+
// return outboxRepo.Insert(ctx, tx, event)
684+
// })
685+
func (r *LedgerRepository) WithTransaction(ctx context.Context, fn func(tx *gorm.DB) error) error {
686+
return r.withTenantTransaction(ctx, fn)
687+
}
688+
689+
// DB returns the underlying GORM database instance.
690+
// This is primarily used for passing the DB to other components that need
691+
// database access, such as the outbox repository for the transactional outbox pattern.
692+
func (r *LedgerRepository) DB() *gorm.DB {
693+
return r.db
694+
}

services/financial-accounting/cmd/main.go

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"syscall"
1515
"time"
1616

17+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
1718
financialaccountingv1 "github.com/meridianhub/meridian/api/proto/meridian/financial_accounting/v1"
1819
"github.com/meridianhub/meridian/services/financial-accounting/adapters/persistence"
1920
serviceobs "github.com/meridianhub/meridian/services/financial-accounting/observability"
@@ -22,6 +23,7 @@ import (
2223
"github.com/meridianhub/meridian/shared/pkg/interceptors"
2324
"github.com/meridianhub/meridian/shared/platform/audit"
2425
"github.com/meridianhub/meridian/shared/platform/auth"
26+
"github.com/meridianhub/meridian/shared/platform/events"
2527
"github.com/meridianhub/meridian/shared/platform/observability"
2628
"github.com/prometheus/client_golang/prometheus/promhttp"
2729
"github.com/redis/go-redis/v9"
@@ -39,6 +41,11 @@ var (
3941
BuildDate = "unknown"
4042
)
4143

44+
// Package-level variables for lifecycle management
45+
var (
46+
outboxWorker *events.Worker
47+
)
48+
4249
// Static errors for configuration validation
4350
var (
4451
ErrBankCashAccountIDRequired = errors.New("BANK_CASH_ACCOUNT_ID environment variable is required")
@@ -126,6 +133,47 @@ func run(logger *slog.Logger) error {
126133
}()
127134
}
128135

136+
// Initialize outbox repository and worker for transactional event publishing.
137+
// The outbox pattern ensures at-least-once delivery of events by storing them
138+
// in the database first and then publishing asynchronously via Kafka.
139+
outboxRepo := events.NewPostgresOutboxRepository(db)
140+
outboxPublisher := events.NewOutboxPublisher("financial-accounting")
141+
142+
// Initialize Kafka producer for outbox worker (optional - depends on KAFKA_BOOTSTRAP_SERVERS)
143+
var kafkaProducer *kafka.Producer
144+
bootstrapServers := getEnvOrDefault("KAFKA_BOOTSTRAP_SERVERS", "")
145+
if bootstrapServers != "" {
146+
producer, err := kafka.NewProducer(&kafka.ConfigMap{
147+
"bootstrap.servers": bootstrapServers,
148+
"client.id": "financial-accounting-outbox-worker",
149+
"acks": "all",
150+
"retries": 3,
151+
"compression.type": "snappy",
152+
"linger.ms": 10,
153+
"batch.size": 16384,
154+
})
155+
if err != nil {
156+
logger.Warn("failed to create Kafka producer for outbox worker",
157+
"error", err)
158+
} else {
159+
kafkaProducer = producer
160+
logger.Info("Kafka producer initialized for outbox worker",
161+
"bootstrap_servers", bootstrapServers)
162+
}
163+
} else {
164+
logger.Info("outbox worker disabled: KAFKA_BOOTSTRAP_SERVERS not set")
165+
}
166+
167+
// Start outbox worker if Kafka producer is available
168+
if kafkaProducer != nil {
169+
workerConfig := events.DefaultWorkerConfig("financial-accounting")
170+
outboxWorker = events.NewWorker(outboxRepo, kafkaProducer, workerConfig, logger)
171+
outboxWorker.Start(ctx)
172+
logger.Info("outbox worker started",
173+
"batch_size", workerConfig.BatchSize,
174+
"poll_interval", workerConfig.PollInterval)
175+
}
176+
129177
// Validate bank cash account ID is configured
130178
bankCashAccountID := getEnvOrDefault("BANK_CASH_ACCOUNT_ID", "")
131179
if bankCashAccountID == "" {
@@ -171,7 +219,13 @@ func run(logger *slog.Logger) error {
171219
logger.Info("event publisher initialized (noop mode)")
172220

173221
// Create Financial Accounting service
174-
financialAccountingSvc, err := service.NewFinancialAccountingService(ledgerRepo, eventPublisher, idempotencySvc)
222+
financialAccountingSvc, err := service.NewFinancialAccountingService(
223+
ledgerRepo,
224+
eventPublisher,
225+
idempotencySvc,
226+
outboxPublisher,
227+
outboxRepo,
228+
)
175229
if err != nil {
176230
return fmt.Errorf("failed to create financial accounting service: %w", err)
177231
}
@@ -314,7 +368,24 @@ func run(logger *slog.Logger) error {
314368
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
315369
defer cancel()
316370

317-
// Shutdown HTTP server first (faster, allows metrics scraping during gRPC drain)
371+
// Stop outbox worker first (stop processing before closing Kafka producer)
372+
if outboxWorker != nil {
373+
logger.Info("stopping outbox worker...")
374+
outboxWorker.Stop()
375+
logger.Info("outbox worker stopped")
376+
}
377+
378+
// Close Kafka producer after outbox worker stops
379+
if kafkaProducer != nil {
380+
logger.Info("flushing Kafka producer...")
381+
// Flush pending messages with 5 second timeout to ensure delivery
382+
kafkaProducer.Flush(5000) // 5 seconds in milliseconds
383+
logger.Info("closing Kafka producer...")
384+
kafkaProducer.Close()
385+
logger.Info("Kafka producer closed")
386+
}
387+
388+
// Shutdown HTTP server (faster, allows metrics scraping during gRPC drain)
318389
if err := httpServer.Shutdown(shutdownCtx); err != nil {
319390
logger.Error("HTTP server shutdown error", "error", err)
320391
} else {

0 commit comments

Comments
 (0)