Skip to content

Commit 4fc4a9c

Browse files
authored
fix: Resolve nightly test timeout in shared/pkg/saga (#693)
Two issues caused the nightly "Slow Integration Tests" job to fail: 1. Container proliferation timeout: ~60 integration tests each created their own CockroachDB testcontainer (~15s startup each), exhausting the 15-minute per-package timeout. Fixed by sharing a single container via TestMain with sync.Once lazy initialization and per-test table cleanup via DELETE. 2. FOR UPDATE SKIP LOCKED in timeout_worker.go: CockroachDB's serializable isolation silently returns empty results for SKIP LOCKED, preventing ProcessExpiredSuspensions from finding expired sagas. Fixed by removing SKIP LOCKED and splitting the CTE+UPDATE into separate SELECT and UPDATE steps (CockroachDB requires FOR UPDATE at the top level of SELECT). This is the same class of issue fixed in claiming.go by PR #689. Also guards RunSagaMigrations against re-invocation on CockroachDB, where GORM's AutoMigrate fails with SQLSTATE 42704 when checking existing unique constraints (naming mismatch between GORM and CockroachDB). Test results: saga package runs in ~22s (previously >15m). Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 99c063d commit 4fc4a9c

3 files changed

Lines changed: 153 additions & 40 deletions

File tree

shared/pkg/saga/migrations.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,17 @@ import (
2323
// }
2424
// }
2525
func RunSagaMigrations(db *gorm.DB) error {
26-
// Run AutoMigrate for the saga models
27-
// This creates the tables and standard indexes defined in struct tags
28-
if err := db.AutoMigrate(&SagaInstance{}, &SagaStepResult{}); err != nil {
29-
return fmt.Errorf("failed to auto-migrate saga models: %w", err)
26+
// Only run AutoMigrate when tables don't yet exist.
27+
// GORM's AutoMigrate is not idempotent on CockroachDB: re-running it
28+
// fails with SQLSTATE 42704 because CockroachDB names unique constraints
29+
// differently than what GORM expects when checking existing schema.
30+
// The partial indexes and composite constraint below are already
31+
// idempotent (IF NOT EXISTS / information_schema checks).
32+
migrator := db.Migrator()
33+
if !migrator.HasTable(&SagaInstance{}) || !migrator.HasTable(&SagaStepResult{}) {
34+
if err := db.AutoMigrate(&SagaInstance{}, &SagaStepResult{}); err != nil {
35+
return fmt.Errorf("failed to auto-migrate saga models: %w", err)
36+
}
3037
}
3138

3239
// Create the partial index for orphan detection
Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,114 @@
11
package saga
22

33
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"sync"
48
"testing"
9+
"time"
510

6-
"github.com/meridianhub/meridian/shared/platform/testdb"
11+
"github.com/testcontainers/testcontainers-go/modules/cockroachdb"
12+
gormpg "gorm.io/driver/postgres"
713
"gorm.io/gorm"
14+
"gorm.io/gorm/logger"
815
)
916

10-
// setupTestPostgres creates a CockroachDB testcontainer for integration testing.
11-
// Named setupTestPostgres for historical compatibility - CockroachDB is wire-compatible
12-
// with PostgreSQL and is our production database.
17+
// sharedDB holds the shared CockroachDB connection for all integration tests.
18+
// Lazily initialized on first use via sync.Once to avoid starting the container
19+
// when only unit tests run (e.g. -short mode).
20+
var (
21+
sharedDB *gorm.DB
22+
sharedOnce sync.Once
23+
sharedInitErr error
24+
sharedCleanup func()
25+
)
26+
27+
func TestMain(m *testing.M) {
28+
code := m.Run()
29+
30+
if sharedCleanup != nil {
31+
sharedCleanup()
32+
}
33+
os.Exit(code)
34+
}
35+
36+
func initSharedContainer() error {
37+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
38+
defer cancel()
39+
40+
crdbContainer, err := cockroachdb.Run(ctx,
41+
"cockroachdb/cockroach:v24.3.0",
42+
cockroachdb.WithDatabase("test_db"),
43+
cockroachdb.WithUser("root"),
44+
cockroachdb.WithInsecure(),
45+
)
46+
if err != nil {
47+
return fmt.Errorf("start container: %w", err)
48+
}
49+
50+
connConfig, err := crdbContainer.ConnectionConfig(ctx)
51+
if err != nil {
52+
_ = crdbContainer.Terminate(ctx)
53+
return fmt.Errorf("connection config: %w", err)
54+
}
55+
56+
db, err := gorm.Open(gormpg.Open(connConfig.ConnString()), &gorm.Config{
57+
Logger: logger.Default.LogMode(logger.Silent),
58+
})
59+
if err != nil {
60+
_ = crdbContainer.Terminate(ctx)
61+
return fmt.Errorf("gorm open: %w", err)
62+
}
63+
64+
// Run migrations once — subsequent RunSagaMigrations calls in tests
65+
// are idempotent no-ops (tables/indexes already exist).
66+
if err := RunSagaMigrations(db); err != nil {
67+
_ = crdbContainer.Terminate(ctx)
68+
return fmt.Errorf("migrations: %w", err)
69+
}
70+
71+
sharedDB = db
72+
sharedCleanup = func() {
73+
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 10*time.Second)
74+
defer cleanupCancel()
75+
76+
sqlDB, _ := db.DB()
77+
if sqlDB != nil {
78+
_ = sqlDB.Close()
79+
}
80+
_ = crdbContainer.Terminate(cleanupCtx)
81+
}
82+
83+
return nil
84+
}
85+
86+
// setupTestPostgres returns the shared CockroachDB connection with per-test
87+
// data cleanup. Named setupTestPostgres for historical compatibility —
88+
// CockroachDB is wire-compatible with PostgreSQL and is our production database.
89+
//
90+
// Previously each test started its own CockroachDB container (~15s startup).
91+
// With ~60 integration tests the package exceeded the 15-minute timeout.
92+
// Sharing one container reduces total overhead from ~15 minutes to ~15 seconds.
1393
func setupTestPostgres(t *testing.T) (*gorm.DB, func()) {
14-
return testdb.SetupCockroachDB(t, nil)
94+
t.Helper()
95+
96+
sharedOnce.Do(func() {
97+
sharedInitErr = initSharedContainer()
98+
})
99+
if sharedInitErr != nil {
100+
t.Fatalf("shared CockroachDB setup failed: %v", sharedInitErr)
101+
}
102+
103+
// Clean tables before each test (FK order: children first).
104+
if err := sharedDB.Exec("DELETE FROM saga_step_results").Error; err != nil {
105+
t.Fatalf("Failed to clean saga_step_results: %v", err)
106+
}
107+
if err := sharedDB.Exec("DELETE FROM saga_instances").Error; err != nil {
108+
t.Fatalf("Failed to clean saga_instances: %v", err)
109+
}
110+
111+
return sharedDB, func() {
112+
// No-op: container lifecycle managed by TestMain.
113+
}
15114
}

shared/pkg/saga/timeout_worker.go

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -107,44 +107,51 @@ func (w *TimeoutWorker) processExpiredSuspensions(ctx context.Context) error {
107107
var expired []expiredSaga
108108

109109
err := w.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
110-
// Find and transition expired suspensions atomically
111-
// Use CTE to capture idempotency_key before clearing suspend_data
112-
// The JSONB query extracts the timeout_at field and compares with NOW()
113-
result := tx.Raw(`
114-
WITH expired_sagas AS (
115-
SELECT id, correlation_id, suspend_data->>'idempotency_key' as idempotency_key
116-
FROM saga_instances
117-
WHERE status = ?
118-
AND (suspend_data->>'timeout_at')::timestamptz < ?
119-
FOR UPDATE SKIP LOCKED
120-
LIMIT ?
121-
)
122-
UPDATE saga_instances si
123-
SET
124-
status = ?,
125-
error_message = ?,
126-
error_category = ?,
127-
updated_at = ?,
128-
suspend_reason = NULL,
129-
suspend_data = NULL
130-
FROM expired_sagas es
131-
WHERE si.id = es.id
132-
RETURNING si.id, es.correlation_id, es.idempotency_key
110+
// Step 1: Find expired suspensions with row-level locking.
111+
// CockroachDB requires FOR UPDATE at the top level of a SELECT (not in
112+
// CTEs or subqueries). SKIP LOCKED is omitted because CockroachDB's
113+
// serializable isolation silently returns empty results when it's used.
114+
// Serializable isolation provides equivalent concurrency safety.
115+
if err := tx.Raw(`
116+
SELECT id, correlation_id, suspend_data->>'idempotency_key' as idempotency_key
117+
FROM saga_instances
118+
WHERE status = ?
119+
AND (suspend_data->>'timeout_at')::timestamptz < ?
120+
ORDER BY id
121+
FOR UPDATE
122+
LIMIT ?
133123
`,
134124
SagaStatusWaitingForEvent,
135125
now,
136126
w.config.BatchSize,
137-
SagaStatusFailed,
138-
"Suspend timeout exceeded - external event not received within deadline",
139-
string(ErrorCategoryFatal),
140-
now,
141-
).Scan(&expired)
127+
).Scan(&expired).Error; err != nil {
128+
return fmt.Errorf("failed to find expired suspensions: %w", err)
129+
}
142130

143-
if result.Error != nil {
144-
return result.Error
131+
if len(expired) == 0 {
132+
return nil
133+
}
134+
135+
// Step 2: Transition expired sagas to FAILED.
136+
// Capture idempotency_key in step 1 before clearing suspend_data here.
137+
ids := make([]uuid.UUID, len(expired))
138+
for i, s := range expired {
139+
ids[i] = s.ID
140+
}
141+
if err := tx.Model(&SagaInstance{}).
142+
Where("id IN ?", ids).
143+
Updates(map[string]interface{}{
144+
"status": SagaStatusFailed,
145+
"error_message": "Suspend timeout exceeded - external event not received within deadline",
146+
"error_category": string(ErrorCategoryFatal),
147+
"updated_at": now,
148+
"suspend_reason": nil,
149+
"suspend_data": nil,
150+
}).Error; err != nil {
151+
return fmt.Errorf("failed to update expired sagas: %w", err)
145152
}
146153

147-
// Also update the corresponding step results to FAILED
154+
// Step 3: Update the corresponding step results to FAILED.
148155
for _, saga := range expired {
149156
stepUpdate := tx.Model(&SagaStepResult{}).
150157
Where("saga_instance_id = ? AND status = ?", saga.ID, StepStatusSuspended).

0 commit comments

Comments
 (0)