Skip to content

Commit 63a4c95

Browse files
committed
tests: cover partial idempotency and core fallback
1 parent a88f848 commit 63a4c95

File tree

1 file changed

+103
-5
lines changed

1 file changed

+103
-5
lines changed

pkg/repository/idempotency_trigger_test.go

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,54 @@ func insertRunStatus(t *testing.T, pool *pgxpool.Pool, tenantId uuid.UUID, wf mi
190190
require.NoError(t, err)
191191
}
192192

193+
func insertTerminalTaskEventsForRun(t *testing.T, pool *pgxpool.Pool, tenantId uuid.UUID, externalId uuid.UUID) {
194+
t.Helper()
195+
ctx := context.Background()
196+
197+
rows, err := pool.Query(ctx, `
198+
WITH lookup AS (
199+
SELECT external_id, dag_id, task_id, inserted_at
200+
FROM v1_lookup_table
201+
WHERE tenant_id = $1 AND external_id = $2
202+
), dag_tasks AS (
203+
SELECT t.id, t.inserted_at, t.retry_count
204+
FROM lookup l
205+
JOIN v1_dag_to_task dt ON dt.dag_id = l.dag_id AND dt.dag_inserted_at = l.inserted_at
206+
JOIN v1_task t ON t.id = dt.task_id AND t.inserted_at = dt.task_inserted_at
207+
WHERE l.dag_id IS NOT NULL
208+
), task_only AS (
209+
SELECT t.id, t.inserted_at, t.retry_count
210+
FROM lookup l
211+
JOIN v1_task t ON t.id = l.task_id AND t.inserted_at = l.inserted_at
212+
WHERE l.task_id IS NOT NULL
213+
)
214+
SELECT id, inserted_at, retry_count FROM dag_tasks
215+
UNION ALL
216+
SELECT id, inserted_at, retry_count FROM task_only
217+
`, tenantId, externalId)
218+
require.NoError(t, err)
219+
defer rows.Close()
220+
221+
taskCount := 0
222+
for rows.Next() {
223+
var (
224+
taskId int64
225+
taskInserted time.Time
226+
retryCount int32
227+
)
228+
require.NoError(t, rows.Scan(&taskId, &taskInserted, &retryCount))
229+
taskCount++
230+
231+
_, err = pool.Exec(ctx, `
232+
INSERT INTO v1_task_event (tenant_id, task_id, task_inserted_at, retry_count, event_type)
233+
VALUES ($1, $2, $3, $4, 'COMPLETED')
234+
`, tenantId, taskId, taskInserted, retryCount)
235+
require.NoError(t, err)
236+
}
237+
require.NoError(t, rows.Err())
238+
require.Greater(t, taskCount, 0)
239+
}
240+
193241
func getLastDeniedAt(t *testing.T, pool *pgxpool.Pool, tenantId uuid.UUID, key IdempotencyKey) pgtype.Timestamptz {
194242
t.Helper()
195243
ctx := context.Background()
@@ -462,8 +510,8 @@ func TestTriggerWorkflowIdempotency_ReclaimDoesNotUpdateLastDeniedAt(t *testing.
462510
assert.False(t, lastDeniedAt.Valid)
463511
}
464512

465-
func TestTriggerWorkflowIdempotency_MixedKeysDeniesBatch(t *testing.T) {
466-
// Mixed batch: one reclaimable, one running. The batch should be denied and no partial commit occurs.
513+
func TestTriggerWorkflowIdempotency_MixedKeysAllowsPartial(t *testing.T) {
514+
// Mixed batch: one reclaimable, one running. We should allow the reclaimable run and skip the duplicate.
467515
pool, cleanup := setupPostgresWithMigration(t)
468516
defer cleanup()
469517

@@ -515,12 +563,19 @@ func TestTriggerWorkflowIdempotency_MixedKeysDeniesBatch(t *testing.T) {
515563
}
516564

517565
_, _, err = repo.Triggers().TriggerFromWorkflowNames(ctx, tenantId, []*WorkflowNameTriggerOpts{terminalSecond, runningSecond})
518-
require.Error(t, err)
519-
assert.ErrorIs(t, err, ErrIdempotencyKeyAlreadyClaimed)
566+
require.NoError(t, err)
520567

521568
claimedTerminal := getClaimedByExternalId(t, pool, tenantId, keyTerminal)
522569
require.NotNil(t, claimedTerminal)
523-
assert.Equal(t, terminalFirst.ExternalId, *claimedTerminal)
570+
assert.Equal(t, terminalSecond.ExternalId, *claimedTerminal)
571+
572+
claimedRunning := getClaimedByExternalId(t, pool, tenantId, keyRunning)
573+
require.NotNil(t, claimedRunning)
574+
assert.Equal(t, runningFirst.ExternalId, *claimedRunning)
575+
576+
lastDeniedAt := getLastDeniedAt(t, pool, tenantId, keyRunning)
577+
assert.True(t, lastDeniedAt.Valid)
578+
assert.WithinDuration(t, time.Now(), lastDeniedAt.Time, 2*time.Minute)
524579
}
525580

526581
func TestTriggerWorkflowIdempotency_ConcurrentDuplicateKey(t *testing.T) {
@@ -708,6 +763,49 @@ func TestTriggerWorkflowIdempotency_RecheckMissingOlapUpdatesLastDeniedAt(t *tes
708763
assert.WithinDuration(t, time.Now(), lastDeniedAt.Time, 2*time.Minute)
709764
}
710765

766+
func TestTriggerWorkflowIdempotency_RecheckUsesCoreWhenOlapMissing(t *testing.T) {
767+
// If OLAP is missing but core task events are terminal, we should reclaim and allow a new run.
768+
pool, cleanup := setupPostgresWithMigration(t)
769+
defer cleanup()
770+
771+
repo, cleanupRepo := setupRepositoryWithTTL(t, pool, 30*time.Minute, 1*time.Minute)
772+
defer cleanupRepo()
773+
774+
tenantId := setupTenant(t, repo, pool)
775+
_ = createMinimalWorkflow(t, pool, tenantId, "test-workflow")
776+
777+
ctx := context.Background()
778+
key := IdempotencyKey("idem-key-core-fallback")
779+
780+
first := &WorkflowNameTriggerOpts{
781+
TriggerTaskData: &TriggerTaskData{
782+
WorkflowName: "test-workflow",
783+
},
784+
ExternalId: uuid.New(),
785+
IdempotencyKey: &key,
786+
}
787+
788+
_, _, err := repo.Triggers().TriggerFromWorkflowNames(ctx, tenantId, []*WorkflowNameTriggerOpts{first})
789+
require.NoError(t, err)
790+
791+
insertTerminalTaskEventsForRun(t, pool, tenantId, first.ExternalId)
792+
793+
second := &WorkflowNameTriggerOpts{
794+
TriggerTaskData: &TriggerTaskData{
795+
WorkflowName: "test-workflow",
796+
},
797+
ExternalId: uuid.New(),
798+
IdempotencyKey: &key,
799+
}
800+
801+
_, _, err = repo.Triggers().TriggerFromWorkflowNames(ctx, tenantId, []*WorkflowNameTriggerOpts{second})
802+
require.NoError(t, err)
803+
804+
claimed := getClaimedByExternalId(t, pool, tenantId, key)
805+
require.NotNil(t, claimed)
806+
assert.Equal(t, second.ExternalId, *claimed)
807+
}
808+
711809
func TestTriggerWorkflowIdempotency_RecheckUpdatesLastDeniedAtWhenNonTerminal(t *testing.T) {
712810
// If the run is non-terminal, we deny and update last_denied_at for throttling.
713811
pool, cleanup := setupPostgresWithMigration(t)

0 commit comments

Comments
 (0)