Skip to content

Commit 89ec596

Browse files
authored
Merge pull request #1666 from meridianhub/fix-develop-20260314-crdb-serialization
fix: Resolve CockroachDB serialization issues causing CI failures
2 parents 52345f9 + 63c0f4b commit 89ec596

2 files changed

Lines changed: 84 additions & 15 deletions

File tree

services/operational-gateway/adapters/persistence/instruction_repository.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package persistence
22

33
import (
44
"context"
5+
"database/sql"
56
"errors"
67
"strings"
78
"time"
@@ -155,6 +156,10 @@ func (r *InstructionRepository) FetchDispatchable(ctx context.Context, params po
155156

156157
var entities []InstructionEntity
157158

159+
// Use READ COMMITTED isolation: CockroachDB's SERIALIZABLE default causes
160+
// unpredictable behavior with FOR UPDATE SKIP LOCKED, where recently-committed
161+
// rows may be skipped. READ COMMITTED matches PostgreSQL semantics and is the
162+
// recommended isolation level for queue-like claim patterns.
158163
err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
159164
// Step 1: Lock candidate IDs. RETRYING rows are only eligible when next_retry_at has passed.
160165
lockSQL := `
@@ -195,7 +200,7 @@ func (r *InstructionRepository) FetchDispatchable(ctx context.Context, params po
195200
return tx.Where("id IN ?", ids).
196201
Order("priority DESC, scheduled_at ASC NULLS FIRST").
197202
Find(&entities).Error
198-
})
203+
}, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
199204
if err != nil {
200205
return nil, err
201206
}

services/reference-data/saga/override_api.go

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"errors"
2121
"fmt"
2222
"log/slog"
23+
"time"
2324

2425
"github.com/google/uuid"
2526
"github.com/jackc/pgx/v5"
27+
"github.com/jackc/pgx/v5/pgconn"
2628
"github.com/jackc/pgx/v5/pgxpool"
2729
"github.com/lib/pq"
2830
"github.com/meridianhub/meridian/shared/platform/tenant"
@@ -222,14 +224,50 @@ func (s *OverrideService) MigrateToPlatformRef(
222224
logger := s.logger.With("tenant_id", tenantID.String(), "dry_run", dryRun)
223225
logger.Info("starting platform reference migration")
224226

225-
// Get all platform sagas
227+
// Get all platform sagas (read-only, outside the retryable transaction).
226228
platformSagas, err := s.loadPlatformSagas(ctx)
227229
if err != nil {
228230
return nil, fmt.Errorf("load platform sagas: %w", err)
229231
}
230232

231233
schemaName := pq.QuoteIdentifier(tenantID.SchemaName())
232234

235+
var results []MigrationResult
236+
err = withCRDBRetry(ctx, func() error {
237+
var txErr error
238+
results, txErr = s.runMigrationTx(ctx, schemaName, platformSagas, dryRun)
239+
return txErr
240+
})
241+
if err != nil {
242+
return nil, err
243+
}
244+
245+
// Log per-saga outcomes after the transaction has committed successfully,
246+
// so retried transactions don't produce duplicate log entries.
247+
for _, r := range results {
248+
if r.Action == MigrationActionMigrated {
249+
logger.Info("migrated saga to platform reference",
250+
"saga_name", r.SagaName,
251+
"saga_id", r.SagaID,
252+
"platform_ref", r.PlatformRefID,
253+
"similarity", r.SimilarityRatio)
254+
}
255+
}
256+
257+
logger.Info("platform reference migration completed",
258+
"total", len(results),
259+
"dry_run", dryRun)
260+
261+
return results, nil
262+
}
263+
264+
// runMigrationTx executes the migration within a single transaction.
265+
func (s *OverrideService) runMigrationTx(
266+
ctx context.Context,
267+
schemaName string,
268+
platformSagas map[string]PlatformSagaDefinition,
269+
dryRun bool,
270+
) ([]MigrationResult, error) {
233271
tx, err := s.pool.Begin(ctx)
234272
if err != nil {
235273
return nil, fmt.Errorf("begin transaction: %w", err)
@@ -250,26 +288,57 @@ func (s *OverrideService) MigrateToPlatformRef(
250288

251289
results := make([]MigrationResult, 0, len(candidates))
252290
for _, ts := range candidates {
253-
result, migrateErr := s.evaluateCandidate(ctx, tx, ts, platformSagas, dryRun, logger)
291+
result, migrateErr := s.evaluateCandidate(ctx, tx, ts, platformSagas, dryRun)
254292
if migrateErr != nil {
255293
return nil, migrateErr
256294
}
257295
results = append(results, result)
258296
}
259297

260298
if !dryRun {
261-
if err := tx.Commit(ctx); err != nil {
299+
if err = tx.Commit(ctx); err != nil {
262300
return nil, fmt.Errorf("commit migration: %w", err)
263301
}
264302
}
265303

266-
logger.Info("platform reference migration completed",
267-
"total", len(results),
268-
"dry_run", dryRun)
269-
270304
return results, nil
271305
}
272306

307+
// withCRDBRetry retries the provided function on CockroachDB serialization errors
308+
// (SQLSTATE 40001). These errors are expected under SERIALIZABLE isolation and are
309+
// safe to retry by re-executing the entire transaction.
310+
func withCRDBRetry(ctx context.Context, fn func() error) error {
311+
const maxRetries = 5
312+
backoff := 50 * time.Millisecond
313+
314+
for attempt := 0; ; attempt++ {
315+
err := fn()
316+
if err == nil {
317+
return nil
318+
}
319+
if attempt >= maxRetries || !isCRDBRetryError(err) {
320+
return err
321+
}
322+
323+
select {
324+
case <-ctx.Done():
325+
return ctx.Err()
326+
case <-time.After(backoff):
327+
}
328+
backoff *= 2
329+
if backoff > 2*time.Second {
330+
backoff = 2 * time.Second
331+
}
332+
}
333+
}
334+
335+
// isCRDBRetryError returns true if the error is a CockroachDB serialization conflict
336+
// (SQLSTATE 40001) that can be resolved by retrying the transaction.
337+
func isCRDBRetryError(err error) bool {
338+
var pgErr *pgconn.PgError
339+
return errors.As(err, &pgErr) && pgErr.Code == "40001"
340+
}
341+
273342
// tenantSaga represents a tenant's saga definition candidate for migration.
274343
type tenantSaga struct {
275344
id uuid.UUID
@@ -312,13 +381,14 @@ func (s *OverrideService) loadTenantSagaCandidates(ctx context.Context, tx pgx.T
312381
}
313382

314383
// evaluateCandidate determines the migration action for a single tenant saga.
384+
// Does not log outcomes — the caller logs after the transaction commits to avoid
385+
// duplicate entries on CockroachDB transaction retries.
315386
func (s *OverrideService) evaluateCandidate(
316387
ctx context.Context,
317388
tx pgx.Tx,
318389
ts tenantSaga,
319390
platformSagas map[string]PlatformSagaDefinition,
320391
dryRun bool,
321-
logger *slog.Logger,
322392
) (MigrationResult, error) {
323393
if ts.platformRef != nil {
324394
return MigrationResult{
@@ -370,12 +440,6 @@ func (s *OverrideService) evaluateCandidate(
370440
return MigrationResult{}, fmt.Errorf("migrate saga %s: %w", ts.name, err)
371441
}
372442

373-
logger.Info("migrated saga to platform reference",
374-
"saga_name", ts.name,
375-
"saga_id", ts.id,
376-
"platform_ref", platformSaga.ID,
377-
"similarity", simResult.Ratio)
378-
379443
return MigrationResult{
380444
SagaName: ts.name,
381445
SagaID: ts.id,

0 commit comments

Comments
 (0)