Skip to content

Commit e81fe2f

Browse files
authored
feat(saga): Implement INSERT-only platform saga sync to prevent data loss (#695)
1 parent 9958d92 commit e81fe2f

7 files changed

Lines changed: 407 additions & 204 deletions

File tree

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
-- Fix platform_saga_definition UNIQUE constraint for version retention
2+
--
3+
-- CRITICAL BUG FIX: The previous UNIQUE(name) constraint allowed only one
4+
-- version per saga in the platform_saga_definition table. When PlatformSync
5+
-- encountered a newer embedded version, it executed UPDATE SET script, version,
6+
-- overwriting the previous version's script content in-place. Running saga
7+
-- instances pin PlatformSagaVersionID at execution time, and after UPDATE
8+
-- destroyed the old script, replay operations would fail.
9+
--
10+
-- This migration:
11+
-- 1. Drops the old UNIQUE(name) index
12+
-- 2. Adds a compound UNIQUE INDEX on (name, version) allowing multiple
13+
-- versions of the same saga to coexist
14+
--
15+
-- Combined with the INSERT-only sync logic in PlatformSync, this ensures
16+
-- that old versions are never overwritten and pinned replays remain
17+
-- deterministic.
18+
--
19+
-- IMPORTANT: This migration runs per-tenant but modifies objects in the shared
20+
-- public schema. All DDL statements must be idempotent to avoid errors when
21+
-- multiple tenant schemas apply the same migration.
22+
23+
-- Drop the old UNIQUE(name) constraint to allow multiple versions per saga.
24+
-- CockroachDB requires DROP INDEX CASCADE for unique constraints.
25+
-- PostgreSQL requires ALTER TABLE DROP CONSTRAINT (handled by migration runner).
26+
-- This migration targets CockroachDB (production). The PostgreSQL test helper
27+
-- applies the equivalent ALTER TABLE DROP CONSTRAINT before running this file.
28+
DROP INDEX IF EXISTS "public"."uq_platform_saga_definition_name" CASCADE;
29+
30+
-- Add new compound unique index allowing multiple versions per saga
31+
-- Using CREATE UNIQUE INDEX IF NOT EXISTS for idempotency
32+
CREATE UNIQUE INDEX IF NOT EXISTS "uq_platform_saga_definition_name_version"
33+
ON "public"."platform_saga_definition" ("name", "version");
34+
35+
-- Update table comment to reflect new multi-version design
36+
COMMENT ON TABLE "public"."platform_saga_definition" IS
37+
'Platform-level saga definitions synced from embedded .star files. Multiple versions per saga are retained for deterministic replay of running instances.';

services/reference-data/saga/platform_sync.go

Lines changed: 41 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99
"regexp"
1010
"strings"
1111

12-
"github.com/Masterminds/semver/v3"
1312
"github.com/google/uuid"
1413
"github.com/jackc/pgx/v5"
14+
"github.com/jackc/pgx/v5/pgconn"
1515
"github.com/jackc/pgx/v5/pgxpool"
1616
)
1717

@@ -48,12 +48,16 @@ func NewPlatformSync(pool *pgxpool.Pool) *PlatformSync {
4848
// SyncPlatformDefaults synchronizes all embedded saga definitions to the platform table.
4949
// This method is idempotent - running it multiple times with the same versions has no effect.
5050
//
51-
// The sync logic:
51+
// The sync logic uses INSERT-only semantics to preserve version history:
5252
// 1. Loads all embedded .star files from the defaults directory
5353
// 2. Extracts version from "# Version: X.Y.Z" comment in each script
54-
// 3. For each saga, compares embedded version with database version
55-
// 4. Updates if embedded version is newer (semver comparison)
56-
// 5. Inserts if saga doesn't exist in database
54+
// 3. For each saga, checks if the exact (name, version) pair already exists
55+
// 4. Inserts a new row if the (name, version) pair is not found
56+
// 5. Skips if the exact (name, version) already exists (idempotent)
57+
//
58+
// Old versions are never overwritten or deleted. This guarantees that running
59+
// saga instances which pinned a PlatformSagaVersionID at execution time can
60+
// always replay using the exact script they started with.
5761
func (s *PlatformSync) SyncPlatformDefaults(ctx context.Context) error {
5862
s.logger.Info("starting platform saga sync")
5963

@@ -109,8 +113,9 @@ func (s *PlatformSync) loadEmbeddedSagas() ([]PlatformSagaDefinition, error) {
109113
"default_version", version)
110114
}
111115

112-
// Generate deterministic UUID based on name
113-
id := uuid.NewSHA1(uuid.NameSpaceDNS, []byte("platform.saga."+meta.Name))
116+
// Generate deterministic UUID based on name and version
117+
// Each (name, version) pair gets a unique, reproducible ID
118+
id := uuid.NewSHA1(uuid.NameSpaceDNS, []byte("platform.saga."+meta.Name+"."+version))
114119

115120
sagas = append(sagas, PlatformSagaDefinition{
116121
ID: id,
@@ -140,64 +145,52 @@ func (s *PlatformSync) readEmbeddedScript(filename string) (string, error) {
140145
return script, nil
141146
}
142147

143-
// syncSaga syncs a single saga to the database.
144-
// Returns true if the saga was inserted/updated, false if skipped.
148+
// syncSaga syncs a single saga version to the database using INSERT-only semantics.
149+
// Returns true if the saga was inserted, false if skipped (already exists).
150+
//
151+
// Old versions are never modified or deleted. Each unique (name, version) pair
152+
// gets its own row, ensuring pinned saga instances can always replay correctly.
145153
func (s *PlatformSync) syncSaga(ctx context.Context, saga PlatformSagaDefinition) (bool, error) {
146-
// Check if saga exists and get current version
147-
var existingVersion string
154+
// Check if exact (name, version) already exists
155+
var existingID uuid.UUID
148156
err := s.pool.QueryRow(ctx, `
149-
SELECT version FROM public.platform_saga_definition WHERE name = $1
150-
`, saga.Name).Scan(&existingVersion)
157+
SELECT id FROM public.platform_saga_definition
158+
WHERE name = $1 AND version = $2
159+
`, saga.Name, saga.Version).Scan(&existingID)
151160

152161
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
153162
return false, fmt.Errorf("query existing version: %w", err)
154163
}
155164

156-
if errors.Is(err, pgx.ErrNoRows) {
157-
// Insert new saga
158-
_, err = s.pool.Exec(ctx, `
159-
INSERT INTO public.platform_saga_definition
160-
(id, name, version, script, display_name, description)
161-
VALUES ($1, $2, $3, $4, $5, $6)
162-
`, saga.ID, saga.Name, saga.Version, saga.Script, saga.DisplayName, saga.Description)
163-
if err != nil {
164-
return false, fmt.Errorf("insert saga: %w", err)
165-
}
166-
167-
s.logger.Info("inserted platform saga",
165+
if err == nil {
166+
// Exact (name, version) already exists - skip (idempotent)
167+
s.logger.Debug("skipping saga (version already exists)",
168168
"name", saga.Name,
169169
"version", saga.Version)
170-
return true, nil
171-
}
172-
173-
// Compare versions using semver
174-
shouldUpdate, err := shouldUpdateVersion(existingVersion, saga.Version)
175-
if err != nil {
176-
return false, fmt.Errorf("compare versions: %w", err)
177-
}
178-
179-
if !shouldUpdate {
180-
s.logger.Debug("skipping saga (version not newer)",
181-
"name", saga.Name,
182-
"existing_version", existingVersion,
183-
"embedded_version", saga.Version)
184170
return false, nil
185171
}
186172

187-
// Update saga with newer version
173+
// INSERT new row for this version
188174
_, err = s.pool.Exec(ctx, `
189-
UPDATE public.platform_saga_definition
190-
SET version = $1, script = $2, display_name = $3, description = $4, updated_at = NOW()
191-
WHERE name = $5
192-
`, saga.Version, saga.Script, saga.DisplayName, saga.Description, saga.Name)
175+
INSERT INTO public.platform_saga_definition
176+
(id, name, version, script, display_name, description)
177+
VALUES ($1, $2, $3, $4, $5, $6)
178+
`, saga.ID, saga.Name, saga.Version, saga.Script, saga.DisplayName, saga.Description)
193179
if err != nil {
194-
return false, fmt.Errorf("update saga: %w", err)
180+
// Handle race condition: another process may have inserted the same (name, version)
181+
var pgErr *pgconn.PgError
182+
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
183+
s.logger.Debug("saga version inserted by concurrent process",
184+
"name", saga.Name,
185+
"version", saga.Version)
186+
return false, nil
187+
}
188+
return false, fmt.Errorf("insert saga: %w", err)
195189
}
196190

197-
s.logger.Info("updated platform saga",
191+
s.logger.Info("inserted platform saga",
198192
"name", saga.Name,
199-
"old_version", existingVersion,
200-
"new_version", saga.Version)
193+
"version", saga.Version)
201194
return true, nil
202195
}
203196

@@ -211,22 +204,6 @@ func extractVersionFromScript(script string) string {
211204
return matches[1]
212205
}
213206

214-
// shouldUpdateVersion returns true if newVersion is greater than existingVersion.
215-
// Uses semver comparison for proper version ordering.
216-
func shouldUpdateVersion(existingVersion, newVersion string) (bool, error) {
217-
existingVer, err := semver.NewVersion(existingVersion)
218-
if err != nil {
219-
return false, fmt.Errorf("parse existing version %q: %w", existingVersion, err)
220-
}
221-
222-
newVer, err := semver.NewVersion(newVersion)
223-
if err != nil {
224-
return false, fmt.Errorf("parse new version %q: %w", newVersion, err)
225-
}
226-
227-
return newVer.GreaterThan(existingVer), nil
228-
}
229-
230207
// humanizeName converts snake_case to Title Case.
231208
// Example: "current_account_withdrawal" -> "Current Account Withdrawal"
232209
func humanizeName(name string) string {

0 commit comments

Comments
 (0)