Skip to content

Commit dbf0395

Browse files
authored
fix: surface reconciliation errors for assertion persistence and finalizer ordering (#2034)
* fix: surface reconciliation errors for assertion persistence and finalizer ordering - handlePKFailure now captures assertionRepo.Update errors and returns a combined error wrapping both the PK failure and the persistence failure instead of silently discarding with _ = - markFinalized reorders operations so snapshots are marked FINAL before the run is persisted as FINALIZED, preventing a state where the run is FINALIZED but its snapshots are not yet FINAL * fix: resolve business run_id to surrogate PK in MarkRunSnapshotsFinal settlement_snapshot.run_id is a FK to settlement_run.id (the surrogate PK), not settlement_run.run_id (the business identifier). The previous implementation passed the UUID directly to the SQL query without resolving the FK-correct surrogate ID, resulting in a silent 0-row update when called with a business UUID. MarkRunSnapshotsFinal now resolves settlement_run.run_id -> settlement_run.id internally before updating, consistent with the integration test repo's approach. Updated the persistence integration test to create snapshots with the surrogate ID (FK correct) and call MarkRunSnapshotsFinal with the business UUID. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 430e828 commit dbf0395

6 files changed

Lines changed: 82 additions & 24 deletions

File tree

services/reconciliation/adapters/persistence/settlement_snapshot_repository.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package persistence
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"time"
78

89
"github.com/google/uuid"
@@ -138,14 +139,20 @@ func (r *SettlementSnapshotRepository) DeleteByRunID(ctx context.Context, runID
138139
}
139140

140141
// MarkRunSnapshotsFinal updates all snapshots for a run to include settlement_type=FINAL in their attributes.
142+
// runID is the business identifier (settlement_run.run_id); this method resolves it to the surrogate PK
143+
// (settlement_run.id) before updating, since settlement_snapshot.run_id is a FK to settlement_run.id.
141144
func (r *SettlementSnapshotRepository) MarkRunSnapshotsFinal(ctx context.Context, runID uuid.UUID) error {
142145
return r.withTenantTransaction(ctx, func(tx *gorm.DB) error {
146+
var runEntity SettlementRunEntity
147+
if err := tx.Select("id").Where("run_id = ?", runID).First(&runEntity).Error; err != nil {
148+
return fmt.Errorf("resolving surrogate ID for run %s: %w", runID, err)
149+
}
143150
// Use CASE to handle NULL/JSON-null attributes vs existing JSONB objects.
144151
// CockroachDB's || operator requires both operands to be JSONB objects,
145152
// so we must replace NULL/json-null with an empty object before merging.
146153
return tx.Exec(
147154
`UPDATE "settlement_snapshot" SET "attributes" = CASE WHEN "attributes" IS NULL OR "attributes"::text = 'null' THEN '{"settlement_type":"FINAL"}'::jsonb ELSE "attributes" || '{"settlement_type":"FINAL"}'::jsonb END WHERE "run_id" = ?`,
148-
runID,
155+
runEntity.ID,
149156
).Error
150157
})
151158
}

services/reconciliation/adapters/persistence/settlement_snapshot_repository_test.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,23 @@ import (
1616
"gorm.io/gorm"
1717
)
1818

19-
// seedSettlementRun creates a settlement_run record and returns the surrogate ID.
19+
// seedRunIDs holds the two distinct identifiers for a seeded settlement_run row.
20+
type seedRunIDs struct {
21+
// SurrogateID is the settlement_run.id (DB surrogate PK); used as the FK target in settlement_snapshot.run_id.
22+
SurrogateID uuid.UUID
23+
// BusinessID is the settlement_run.run_id (business identifier); passed to MarkRunSnapshotsFinal and similar methods.
24+
BusinessID uuid.UUID
25+
}
26+
27+
// seedSettlementRun creates a settlement_run record and returns the surrogate ID (settlement_run.id).
28+
// For tests that also need the business run_id, use seedSettlementRunFull.
2029
func seedSettlementRun(t *testing.T, ctx context.Context, db *gorm.DB) uuid.UUID {
30+
t.Helper()
31+
return seedSettlementRunFull(t, ctx, db).SurrogateID
32+
}
33+
34+
// seedSettlementRunFull creates a settlement_run record and returns both the surrogate ID and the business run_id.
35+
func seedSettlementRunFull(t *testing.T, ctx context.Context, db *gorm.DB) seedRunIDs {
2136
t.Helper()
2237
tid := tenant.TenantID("test-tenant-01")
2338
quoted := fmt.Sprintf("%q", tid.SchemaName())
@@ -30,7 +45,7 @@ func seedSettlementRun(t *testing.T, ctx context.Context, db *gorm.DB) uuid.UUID
3045
surrogateID, runID,
3146
).Error
3247
require.NoError(t, err)
33-
return surrogateID
48+
return seedRunIDs{SurrogateID: surrogateID, BusinessID: runID}
3449
}
3550

3651
func newTestSnapshot(t *testing.T, runID uuid.UUID) *domain.SettlementSnapshot {
@@ -234,29 +249,33 @@ func TestSettlementSnapshotRepository_MarkRunSnapshotsFinal(t *testing.T) {
234249
repo := persistence.NewSettlementSnapshotRepository(db)
235250
ctx := tenantCtx()
236251

237-
runID := seedSettlementRun(t, ctx, db)
252+
// Use seedSettlementRunFull so we have both IDs:
253+
// SurrogateID → stored in settlement_snapshot.run_id (FK target)
254+
// BusinessID → passed to MarkRunSnapshotsFinal (business identifier)
255+
run := seedSettlementRunFull(t, ctx, db)
238256

239-
// Create snapshots with various attributes
240-
s1 := newTestSnapshot(t, runID)
257+
// Create snapshots with the surrogate ID so the FK constraint is satisfied.
258+
s1 := newTestSnapshot(t, run.SurrogateID)
241259
s1.Attributes = map[string]string{"bucket": "default"}
242260
require.NoError(t, repo.Create(ctx, s1))
243261

244-
s2 := newTestSnapshot(t, runID)
262+
s2 := newTestSnapshot(t, run.SurrogateID)
245263
s2.AccountID = "ACC-002"
246264
s2.Attributes = nil // Test nil attributes case
247265
require.NoError(t, repo.Create(ctx, s2))
248266

249-
s3 := newTestSnapshot(t, runID)
267+
s3 := newTestSnapshot(t, run.SurrogateID)
250268
s3.AccountID = "ACC-003"
251269
s3.Attributes = map[string]string{"region": "eu-west-1"}
252270
require.NoError(t, repo.Create(ctx, s3))
253271

254-
// Mark all snapshots as FINAL
255-
err := repo.MarkRunSnapshotsFinal(ctx, runID)
272+
// Mark all snapshots as FINAL using the business run_id (not the surrogate PK).
273+
// MarkRunSnapshotsFinal resolves the business ID to the surrogate PK internally.
274+
err := repo.MarkRunSnapshotsFinal(ctx, run.BusinessID)
256275
require.NoError(t, err)
257276

258277
// Verify all snapshots have settlement_type=FINAL
259-
found, err := repo.FindByRunID(ctx, runID)
278+
found, err := repo.FindByRunID(ctx, run.SurrogateID)
260279
require.NoError(t, err)
261280
assert.Len(t, found, 3)
262281
for _, snap := range found {

services/reconciliation/service/balance_assertor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,12 @@ func (ba *BalanceAssertor) handlePKFailure(ctx context.Context, assertion *domai
166166
if failErr := assertion.Fail(decimal.Zero, failReason); failErr != nil {
167167
ba.logger.Error("failed to mark assertion as failed", "error", failErr)
168168
}
169-
_ = ba.assertionRepo.Update(ctx, assertion)
169+
retErr := fmt.Errorf("querying position keeping: %w", pkErr)
170+
if updateErr := ba.assertionRepo.Update(ctx, assertion); updateErr != nil {
171+
retErr = fmt.Errorf("%w; persisting FAILED assertion: %w", retErr, updateErr)
172+
}
170173
observability.BalanceAssertionTotal.WithLabelValues("FAILED", scope.String()).Inc()
171-
return &AssertBalanceResult{Assertion: assertion}, fmt.Errorf("querying position keeping: %w", pkErr)
174+
return &AssertBalanceResult{Assertion: assertion}, retErr
172175
}
173176

174177
// handleBalanced records a PASSED assertion when debits equal credits.

services/reconciliation/service/balance_assertor_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,33 @@ func TestExecuteBalanceAssertion_PKClientError(t *testing.T) {
360360
assert.Equal(t, domain.AssertionStatusFailed, result.Assertion.Status)
361361
}
362362

363+
func TestExecuteBalanceAssertion_PKClientError_WithRepoUpdateError(t *testing.T) {
364+
repo := newMockAssertionRepo()
365+
repo.updateErr = errors.New("db write failed")
366+
pkClient := &mockPKClient{
367+
err: errors.New("connection refused"),
368+
}
369+
370+
assertor := NewBalanceAssertor(repo, nil, pkClient, nil, nil, testLogger())
371+
372+
result, err := assertor.ExecuteBalanceAssertion(context.Background(), AssertBalanceRequest{
373+
AccountID: "ACC-001",
374+
InstrumentCode: "GBP",
375+
Expression: "total_debits == total_credits",
376+
ExpectedBalance: decimal.Zero,
377+
Scope: domain.AssertionScopePositionLedger,
378+
CallerRole: CallerRoleTenantAdmin,
379+
})
380+
381+
require.Error(t, err)
382+
assert.Contains(t, err.Error(), "querying position keeping")
383+
assert.Contains(t, err.Error(), "persisting FAILED assertion")
384+
385+
// Result is still returned with the failed assertion
386+
require.NotNil(t, result)
387+
assert.Equal(t, domain.AssertionStatusFailed, result.Assertion.Status)
388+
}
389+
363390
func TestExecuteBalanceAssertion_TrendTracking(t *testing.T) {
364391
trendRepo := newMockTrendRepo()
365392
publisher := &mockEventPublisher{}

services/reconciliation/service/finalizer.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,19 +214,20 @@ func (f *SettlementFinalizer) checkPendingOperations(ctx context.Context, run *d
214214
}
215215

216216
// markFinalized transitions the run to FINALIZED and marks snapshots as FINAL.
217+
// Snapshots are marked FINAL before the run is persisted as FINALIZED to prevent
218+
// a state where the run appears FINALIZED but its snapshots are not yet FINAL.
217219
func (f *SettlementFinalizer) markFinalized(ctx context.Context, run *domain.SettlementRun) error {
220+
if err := f.snapRepo.MarkRunSnapshotsFinal(ctx, run.RunID); err != nil {
221+
return fmt.Errorf("marking snapshots FINAL for run %s: %w", run.RunID, err)
222+
}
223+
218224
if err := run.Finalize(); err != nil {
219225
return fmt.Errorf("transitioning run %s to FINALIZED: %w", run.RunID, err)
220226
}
221227
if err := f.runRepo.Update(ctx, run); err != nil {
222228
return fmt.Errorf("persisting FINALIZED state for run %s: %w", run.RunID, err)
223229
}
224230

225-
if err := f.snapRepo.MarkRunSnapshotsFinal(ctx, run.RunID); err != nil {
226-
f.logger.WarnContext(ctx, "failed to mark snapshots as FINAL",
227-
"run_id", run.RunID, "error", err)
228-
}
229-
230231
return nil
231232
}
232233

services/reconciliation/service/finalizer_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ func TestFinalizeSettlement_NilLockClient(t *testing.T) {
395395
assert.Equal(t, domain.RunStatusFinalized, finalizedRun.Status)
396396
}
397397

398-
func TestFinalizeSettlement_SnapshotMarkFailureNonFatal(t *testing.T) {
398+
func TestFinalizeSettlement_SnapshotMarkFailureBlocksFinalization(t *testing.T) {
399399
runRepo := newMockRunRepo()
400400
snapRepo := &mockFinalSnapshotRepo{markErr: errors.New("snapshot update failed")}
401401
publisher := &mockFinalityPublisher{}
@@ -405,12 +405,13 @@ func TestFinalizeSettlement_SnapshotMarkFailureNonFatal(t *testing.T) {
405405

406406
finalizer := NewSettlementFinalizer(runRepo, snapRepo, nil, publisher, nil)
407407
err := finalizer.FinalizeSettlement(serviceCtx(), run.RunID)
408-
// Should succeed even if snapshot marking fails (non-fatal)
409-
require.NoError(t, err)
408+
// Should fail if snapshot marking fails - run must not be FINALIZED with non-FINAL snapshots
409+
require.Error(t, err)
410+
assert.Contains(t, err.Error(), "marking snapshots FINAL")
410411

411-
// Run should still be FINALIZED
412-
finalizedRun, _ := runRepo.FindByID(context.Background(), run.RunID)
413-
assert.Equal(t, domain.RunStatusFinalized, finalizedRun.Status)
412+
// Run should NOT be FINALIZED since snapshots couldn't be marked FINAL
413+
unchangedRun, _ := runRepo.FindByID(context.Background(), run.RunID)
414+
assert.NotEqual(t, domain.RunStatusFinalized, unchangedRun.Status)
414415
}
415416

416417
func TestFinalizeSettlement_PublisherFailureNonFatal(t *testing.T) {

0 commit comments

Comments
 (0)