Skip to content

Commit b302eb7

Browse files
authored
feat: add idempotency protection to update operations (#379)
1 parent 6b2e20a commit b302eb7

5 files changed

Lines changed: 711 additions & 19 deletions

File tree

api/proto/meridian/financial_accounting/v1/financial_accounting.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ message UpdateFinancialBookingLogRequest {
220220
// If empty, the existing rules are preserved (optional update).
221221
// Validation: Min length is enforced only if the field is provided (service layer).
222222
string chart_of_accounts_rules = 3;
223+
224+
// idempotency_key ensures exactly-once processing.
225+
meridian.common.v1.IdempotencyKey idempotency_key = 4 [(buf.validate.field).required = true];
223226
}
224227

225228
// UpdateFinancialBookingLogResponse returns the updated booking log.
@@ -316,6 +319,9 @@ message UpdateLedgerPostingRequest {
316319
// posting_result describes the outcome of the posting operation.
317320
// If empty, the existing result is preserved (optional update).
318321
string posting_result = 3 [(buf.validate.field).string = {max_len: 1000}];
322+
323+
// idempotency_key ensures exactly-once processing.
324+
meridian.common.v1.IdempotencyKey idempotency_key = 4 [(buf.validate.field).required = true];
319325
}
320326

321327
// UpdateLedgerPostingResponse returns the updated posting.

services/financial-accounting/service/financial_accounting_service.go

Lines changed: 193 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -543,23 +543,77 @@ func (s *FinancialAccountingService) ListLedgerPostings(
543543
// UpdateLedgerPosting updates an existing ledger posting's status and result.
544544
//
545545
// Workflow:
546-
// 1. Parse and validate request fields
547-
// 2. Retrieve existing posting by ID
548-
// 3. Validate state transition rules (e.g., cannot change POSTED status)
549-
// 4. Apply update using domain methods (Post/Fail)
550-
// 5. Persist updated posting
551-
// 6. Publish domain event (LedgerPostingUpdatedEvent)
552-
// 7. Return updated posting
546+
// 1. Check idempotency using request's IdempotencyKey
547+
// 2. Parse and validate request fields
548+
// 3. Retrieve existing posting by ID
549+
// 4. Validate state transition rules (e.g., cannot change POSTED status)
550+
// 5. Apply update using domain methods (Post/Fail)
551+
// 6. Persist updated posting
552+
// 7. Publish domain event (LedgerPostingUpdatedEvent)
553+
// 8. Return updated posting
554+
//
555+
// Idempotency Note:
556+
// Unlike CaptureLedgerPosting where idempotency is optional (create operations
557+
// naturally fail on duplicate IDs), update operations REQUIRE idempotency keys
558+
// because state-machine transitions must be exactly-once. A duplicate update
559+
// could incorrectly transition an entity through multiple states.
553560
//
554561
// Error mapping:
555562
// - Invalid request fields -> codes.InvalidArgument
563+
// - Duplicate idempotency key -> codes.AlreadyExists
556564
// - Posting not found -> codes.NotFound
557565
// - Invalid state transition -> codes.FailedPrecondition
558566
// - Internal errors -> codes.Internal
559567
func (s *FinancialAccountingService) UpdateLedgerPosting(
560568
ctx context.Context,
561569
req *financialaccountingv1.UpdateLedgerPostingRequest,
562570
) (*financialaccountingv1.UpdateLedgerPostingResponse, error) {
571+
// Validate idempotency key is provided
572+
if req.IdempotencyKey == nil || req.IdempotencyKey.Key == "" {
573+
return nil, status.Error(codes.InvalidArgument, "idempotency_key is required")
574+
}
575+
576+
idempotencyKey := idempotency.Key{
577+
Namespace: "financial-accounting",
578+
Operation: "update-posting",
579+
EntityID: req.GetId(),
580+
RequestID: req.IdempotencyKey.Key,
581+
}
582+
583+
// Check idempotency - defensive nil check (constructor requires non-nil service)
584+
// TODO(ledger-integrity#15): If an error occurs after MarkPending but before StoreResult,
585+
// the pending marker is left orphaned until TTL expiry. Consider adding cleanup on error.
586+
if s.idempotency != nil {
587+
result, err := s.idempotency.Check(ctx, idempotencyKey)
588+
if err != nil && !errors.Is(err, idempotency.ErrResultNotFound) {
589+
if errors.Is(err, idempotency.ErrOperationAlreadyProcessed) {
590+
if result != nil && result.Status == idempotency.StatusCompleted && len(result.Data) > 0 {
591+
// Deserialize cached response from protobuf
592+
var cachedResponse financialaccountingv1.UpdateLedgerPostingResponse
593+
if unmarshalErr := proto.Unmarshal(result.Data, &cachedResponse); unmarshalErr != nil {
594+
slog.Error("failed to deserialize cached idempotency response",
595+
"error", unmarshalErr,
596+
"idempotency_key", req.IdempotencyKey.Key,
597+
"operation", "update-posting")
598+
return nil, status.Error(codes.AlreadyExists, "request with this idempotency key already processed")
599+
}
600+
slog.Info("returning cached idempotent response",
601+
"idempotency_key", req.IdempotencyKey.Key,
602+
"operation", "update-posting",
603+
"posting_id", req.GetId())
604+
return &cachedResponse, nil
605+
}
606+
return nil, status.Error(codes.AlreadyExists, "request with this idempotency key already processed")
607+
}
608+
return nil, status.Errorf(codes.Internal, "failed to check idempotency: %v", err)
609+
}
610+
611+
// Mark as pending to prevent concurrent processing
612+
if err := s.idempotency.MarkPending(ctx, idempotencyKey, defaultIdempotencyTTL); err != nil {
613+
return nil, status.Errorf(codes.Internal, "failed to mark operation as pending: %v", err)
614+
}
615+
}
616+
563617
// Parse posting ID
564618
postingID, err := parseUUID(req.GetId())
565619
if err != nil {
@@ -636,9 +690,43 @@ func (s *FinancialAccountingService) UpdateLedgerPosting(
636690
// TODO(75-async-audit#5): Implement LedgerPostingUpdatedEvent and publish it
637691

638692
// Convert to proto response
639-
return &financialaccountingv1.UpdateLedgerPostingResponse{
693+
response := &financialaccountingv1.UpdateLedgerPostingResponse{
640694
LedgerPosting: toProtoLedgerPosting(posting),
641-
}, nil
695+
}
696+
697+
// Store idempotency result (only if service configured)
698+
if s.idempotency != nil {
699+
ttl := defaultIdempotencyTTL
700+
if req.IdempotencyKey.TtlSeconds > 0 {
701+
ttl = time.Duration(req.IdempotencyKey.TtlSeconds) * time.Second
702+
}
703+
704+
// Serialize response using protobuf for idempotent storage
705+
responseData, marshalErr := proto.Marshal(response)
706+
if marshalErr != nil {
707+
slog.Error("failed to serialize response for idempotency cache",
708+
"error", marshalErr,
709+
"idempotency_key", req.IdempotencyKey.Key,
710+
"operation", "update-posting")
711+
} else {
712+
result := idempotency.Result{
713+
Key: idempotencyKey,
714+
Status: idempotency.StatusCompleted,
715+
Data: responseData,
716+
CompletedAt: time.Now(),
717+
TTL: ttl,
718+
}
719+
720+
if storeErr := s.idempotency.StoreResult(ctx, result); storeErr != nil {
721+
slog.Error("failed to store idempotency result",
722+
"error", storeErr,
723+
"idempotency_key", req.IdempotencyKey.Key,
724+
"operation", "update-posting")
725+
}
726+
}
727+
}
728+
729+
return response, nil
642730
}
643731

644732
// isValidCurrencyCode validates that a currency code matches ISO 4217 format.
@@ -810,22 +898,76 @@ func (s *FinancialAccountingService) RetrieveFinancialBookingLog(
810898
// UpdateFinancialBookingLog updates an existing booking log's status and rules.
811899
//
812900
// Workflow:
813-
// 1. Parse and validate request fields
814-
// 2. Retrieve existing booking log by ID
815-
// 3. Validate state transition rules
816-
// 4. Apply updates using domain methods
817-
// 5. Persist updated booking log
818-
// 6. Return updated booking log
901+
// 1. Check idempotency using request's IdempotencyKey
902+
// 2. Parse and validate request fields
903+
// 3. Retrieve existing booking log by ID
904+
// 4. Validate state transition rules
905+
// 5. Apply updates using domain methods
906+
// 6. Persist updated booking log
907+
// 7. Return updated booking log
908+
//
909+
// Idempotency Note:
910+
// Unlike InitiateFinancialBookingLog where idempotency is optional (create operations
911+
// naturally fail on duplicate IDs), update operations REQUIRE idempotency keys
912+
// because state-machine transitions must be exactly-once. A duplicate update
913+
// could incorrectly transition an entity through multiple states.
819914
//
820915
// Error mapping:
821916
// - Invalid request fields -> codes.InvalidArgument
917+
// - Duplicate idempotency key -> codes.AlreadyExists
822918
// - Booking log not found -> codes.NotFound
823919
// - Invalid state transition -> codes.FailedPrecondition
824920
// - Internal errors -> codes.Internal
825921
func (s *FinancialAccountingService) UpdateFinancialBookingLog(
826922
ctx context.Context,
827923
req *financialaccountingv1.UpdateFinancialBookingLogRequest,
828924
) (*financialaccountingv1.UpdateFinancialBookingLogResponse, error) {
925+
// Validate idempotency key is provided
926+
if req.IdempotencyKey == nil || req.IdempotencyKey.Key == "" {
927+
return nil, status.Error(codes.InvalidArgument, "idempotency_key is required")
928+
}
929+
930+
idempotencyKey := idempotency.Key{
931+
Namespace: "financial-accounting",
932+
Operation: "update-booking-log",
933+
EntityID: req.GetId(),
934+
RequestID: req.IdempotencyKey.Key,
935+
}
936+
937+
// Check idempotency - defensive nil check (constructor requires non-nil service)
938+
// TODO(ledger-integrity#15): If an error occurs after MarkPending but before StoreResult,
939+
// the pending marker is left orphaned until TTL expiry. Consider adding cleanup on error.
940+
if s.idempotency != nil {
941+
result, err := s.idempotency.Check(ctx, idempotencyKey)
942+
if err != nil && !errors.Is(err, idempotency.ErrResultNotFound) {
943+
if errors.Is(err, idempotency.ErrOperationAlreadyProcessed) {
944+
if result != nil && result.Status == idempotency.StatusCompleted && len(result.Data) > 0 {
945+
// Deserialize cached response from protobuf
946+
var cachedResponse financialaccountingv1.UpdateFinancialBookingLogResponse
947+
if unmarshalErr := proto.Unmarshal(result.Data, &cachedResponse); unmarshalErr != nil {
948+
slog.Error("failed to deserialize cached idempotency response",
949+
"error", unmarshalErr,
950+
"idempotency_key", req.IdempotencyKey.Key,
951+
"operation", "update-booking-log")
952+
return nil, status.Error(codes.AlreadyExists, "request with this idempotency key already processed")
953+
}
954+
slog.Info("returning cached idempotent response",
955+
"idempotency_key", req.IdempotencyKey.Key,
956+
"operation", "update-booking-log",
957+
"booking_log_id", req.GetId())
958+
return &cachedResponse, nil
959+
}
960+
return nil, status.Error(codes.AlreadyExists, "request with this idempotency key already processed")
961+
}
962+
return nil, status.Errorf(codes.Internal, "failed to check idempotency: %v", err)
963+
}
964+
965+
// Mark as pending to prevent concurrent processing
966+
if err := s.idempotency.MarkPending(ctx, idempotencyKey, defaultIdempotencyTTL); err != nil {
967+
return nil, status.Errorf(codes.Internal, "failed to mark operation as pending: %v", err)
968+
}
969+
}
970+
829971
// Parse booking log ID
830972
bookingLogID, err := parseUUID(req.GetId())
831973
if err != nil {
@@ -935,9 +1077,43 @@ func (s *FinancialAccountingService) UpdateFinancialBookingLog(
9351077
// TODO(75-async-audit#5): Publish FinancialBookingLogUpdatedEvent for inter-service coordination
9361078

9371079
// Convert to proto response
938-
return &financialaccountingv1.UpdateFinancialBookingLogResponse{
1080+
response := &financialaccountingv1.UpdateFinancialBookingLogResponse{
9391081
FinancialBookingLog: toProtoFinancialBookingLog(&updated),
940-
}, nil
1082+
}
1083+
1084+
// Store idempotency result (only if service configured)
1085+
if s.idempotency != nil {
1086+
ttl := defaultIdempotencyTTL
1087+
if req.IdempotencyKey.TtlSeconds > 0 {
1088+
ttl = time.Duration(req.IdempotencyKey.TtlSeconds) * time.Second
1089+
}
1090+
1091+
// Serialize response using protobuf for idempotent storage
1092+
responseData, marshalErr := proto.Marshal(response)
1093+
if marshalErr != nil {
1094+
slog.Error("failed to serialize response for idempotency cache",
1095+
"error", marshalErr,
1096+
"idempotency_key", req.IdempotencyKey.Key,
1097+
"operation", "update-booking-log")
1098+
} else {
1099+
result := idempotency.Result{
1100+
Key: idempotencyKey,
1101+
Status: idempotency.StatusCompleted,
1102+
Data: responseData,
1103+
CompletedAt: time.Now(),
1104+
TTL: ttl,
1105+
}
1106+
1107+
if storeErr := s.idempotency.StoreResult(ctx, result); storeErr != nil {
1108+
slog.Error("failed to store idempotency result",
1109+
"error", storeErr,
1110+
"idempotency_key", req.IdempotencyKey.Key,
1111+
"operation", "update-booking-log")
1112+
}
1113+
}
1114+
}
1115+
1116+
return response, nil
9411117
}
9421118

9431119
// isValidBookingLogTransition validates that a status transition is allowed.

0 commit comments

Comments
 (0)