diff --git a/pkg/repository/durable_events_test.go b/pkg/repository/durable_events_test.go new file mode 100644 index 0000000000..3065abb84b --- /dev/null +++ b/pkg/repository/durable_events_test.go @@ -0,0 +1,1039 @@ +//go:build !e2e && !load && !rampup && !integration + +package repository + +import ( + "context" + "crypto/sha256" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" +) + +func createDurableEventsRepository(pool *pgxpool.Pool) DurableEventsRepository { + logger := zerolog.Nop() + + payloadStore := NewPayloadStoreRepository(pool, &logger, sqlcv1.New(), PayloadStoreRepositoryOpts{}) + + shared := &sharedRepository{ + pool: pool, + l: &logger, + queries: sqlcv1.New(), + payloadStore: payloadStore, + } + + return newDurableEventsRepository(shared) +} + +func timestamptz(t time.Time) pgtype.Timestamptz { + return pgtype.Timestamptz{Time: t, Valid: true} +} + +var durableTaskIdCounter int64 + +func newDurableTaskId() (int64, pgtype.Timestamptz) { + id := atomic.AddInt64(&durableTaskIdCounter, 1) + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + return id, insertedAt +} + +func createDurableEventLogPartitions(t *testing.T, pool *pgxpool.Pool) { + ctx := context.Background() + today := time.Now().UTC().Format("20060102") + + tables := []string{ + "v1_durable_event_log_file", + "v1_durable_event_log_entry", + "v1_durable_event_log_callback", + } + + for _, table := range tables { + partitionName := table + "_" + today + _, err := pool.Exec(ctx, ` + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = '`+partitionName+`') THEN + EXECUTE format('CREATE TABLE %I PARTITION OF `+table+` FOR VALUES FROM (%L) TO (%L)', + '`+partitionName+`', + CURRENT_DATE, + CURRENT_DATE + INTERVAL '1 day'); + END IF; + END $$; + `) + require.NoError(t, err) + } +} + +func TestCreateEventLogFiles(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + latestInsertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogFileOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + LatestInsertedAt: latestInsertedAt, + LatestNodeId: 0, + LatestBranchId: 0, + LatestBranchFirstParentNodeId: 0, + }, + } + + files, err := repo.CreateEventLogFiles(ctx, opts) + require.NoError(t, err) + require.Len(t, files, 1) + + assert.Equal(t, durableTaskId, files[0].DurableTaskID) + assert.Equal(t, int64(0), files[0].LatestNodeID) + assert.Equal(t, int64(0), files[0].LatestBranchID) +} + +func TestGetEventLogFileForTask(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + latestInsertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogFileOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + LatestInsertedAt: latestInsertedAt, + LatestNodeId: 5, + LatestBranchId: 1, + LatestBranchFirstParentNodeId: 3, + }, + } + + _, err := repo.CreateEventLogFiles(ctx, opts) + require.NoError(t, err) + + file, err := repo.GetEventLogFileForTask(ctx, durableTaskId, durableTaskInsertedAt) + require.NoError(t, err) + + assert.Equal(t, durableTaskId, file.DurableTaskID) + assert.Equal(t, int64(5), file.LatestNodeID) + assert.Equal(t, int64(1), file.LatestBranchID) + assert.Equal(t, int64(3), file.LatestBranchFirstParentNodeID) +} + +func TestCreateEventLogEntries(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + tenantId := uuid.New() + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + data := []byte(`{"key": "value"}`) + + opts := []CreateEventLogEntryOpts{ + { + TenantId: tenantId, + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + Data: data, + }, + } + + entries, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + require.Len(t, entries, 1) + + assert.Equal(t, durableTaskId, entries[0].DurableTaskID) + assert.Equal(t, int64(1), entries[0].NodeID) + assert.Equal(t, int64(0), entries[0].BranchID) + + expectedHash := sha256.Sum256(data) + assert.Equal(t, expectedHash[:], entries[0].DataHash) + assert.Equal(t, "sha256", entries[0].DataHashAlg.String) +} + +func TestCreateEventLogEntriesWithoutData(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + Data: nil, + }, + } + + entries, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + require.Len(t, entries, 1) + + assert.Nil(t, entries[0].DataHash) +} + +func TestGetEventLogEntry(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_STARTED", + NodeId: 42, + ParentNodeId: 10, + BranchId: 2, + Data: nil, + }, + } + + _, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + + entry, err := repo.GetEventLogEntry(ctx, durableTaskId, durableTaskInsertedAt, 42) + require.NoError(t, err) + + assert.Equal(t, durableTaskId, entry.DurableTaskID) + assert.Equal(t, int64(42), entry.NodeID) + assert.Equal(t, int64(10), entry.ParentNodeID.Int64) + assert.Equal(t, int64(2), entry.BranchID) +} + +func TestListEventLogEntries(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + }, + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_STARTED", + NodeId: 2, + ParentNodeId: 1, + BranchId: 0, + }, + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "MEMO_STARTED", + NodeId: 3, + ParentNodeId: 2, + BranchId: 0, + }, + } + + _, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + + entries, err := repo.ListEventLogEntries(ctx, durableTaskId, durableTaskInsertedAt) + require.NoError(t, err) + require.Len(t, entries, 3) + + nodeIds := make([]int64, len(entries)) + for i, e := range entries { + nodeIds[i] = e.NodeID + } + assert.ElementsMatch(t, []int64{1, 2, 3}, nodeIds) +} + +func TestCreateEventLogCallbacks(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_COMPLETED", + Key: "run:abc123", + NodeId: 1, + IsSatisfied: false, + }, + } + + callbacks, err := repo.CreateEventLogCallbacks(ctx, opts) + require.NoError(t, err) + require.Len(t, callbacks, 1) + + assert.Equal(t, durableTaskId, callbacks[0].DurableTaskID) + assert.Equal(t, "run:abc123", callbacks[0].Key) + assert.Equal(t, int64(1), callbacks[0].NodeID) + assert.False(t, callbacks[0].IsSatisfied) +} + +func TestGetEventLogCallback(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_COMPLETED", + Key: "wait:sleep:5s", + NodeId: 5, + IsSatisfied: true, + }, + } + + _, err := repo.CreateEventLogCallbacks(ctx, opts) + require.NoError(t, err) + + callback, err := repo.GetEventLogCallback(ctx, durableTaskId, durableTaskInsertedAt, "wait:sleep:5s") + require.NoError(t, err) + + assert.Equal(t, durableTaskId, callback.DurableTaskID) + assert.Equal(t, "wait:sleep:5s", callback.Key) + assert.Equal(t, int64(5), callback.NodeID) + assert.True(t, callback.IsSatisfied) +} + +func TestListEventLogCallbacks(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_COMPLETED", + Key: "run:task1", + NodeId: 1, + IsSatisfied: false, + }, + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_COMPLETED", + Key: "wait:event:user_input", + NodeId: 2, + IsSatisfied: false, + }, + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "MEMO_COMPLETED", + Key: "memo:cache_key", + NodeId: 3, + IsSatisfied: true, + }, + } + + _, err := repo.CreateEventLogCallbacks(ctx, opts) + require.NoError(t, err) + + callbacks, err := repo.ListEventLogCallbacks(ctx, durableTaskId, durableTaskInsertedAt) + require.NoError(t, err) + require.Len(t, callbacks, 3) + + keys := make([]string, len(callbacks)) + for i, c := range callbacks { + keys[i] = c.Key + } + assert.ElementsMatch(t, []string{"run:task1", "wait:event:user_input", "memo:cache_key"}, keys) +} + +func TestUpdateEventLogCallbackSatisfied(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_COMPLETED", + Key: "wait:child_workflow", + NodeId: 10, + IsSatisfied: false, + }, + } + + _, err := repo.CreateEventLogCallbacks(ctx, opts) + require.NoError(t, err) + + callback, err := repo.GetEventLogCallback(ctx, durableTaskId, durableTaskInsertedAt, "wait:child_workflow") + require.NoError(t, err) + assert.False(t, callback.IsSatisfied) + + updated, err := repo.UpdateEventLogCallbackSatisfied(ctx, durableTaskId, durableTaskInsertedAt, "wait:child_workflow", true) + require.NoError(t, err) + assert.True(t, updated.IsSatisfied) + + callback, err = repo.GetEventLogCallback(ctx, durableTaskId, durableTaskInsertedAt, "wait:child_workflow") + require.NoError(t, err) + assert.True(t, callback.IsSatisfied) +} + +// Callback satisfaction can toggle back to false per schema documentation: +// "is_satisfied _may_ change multiple times through the lifecycle of a callback" +func TestUpdateEventLogCallbackSatisfiedToggle(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_COMPLETED", + Key: "wait:toggleable", + NodeId: 1, + IsSatisfied: false, + }, + } + + _, err := repo.CreateEventLogCallbacks(ctx, opts) + require.NoError(t, err) + + _, err = repo.UpdateEventLogCallbackSatisfied(ctx, durableTaskId, durableTaskInsertedAt, "wait:toggleable", true) + require.NoError(t, err) + + callback, err := repo.GetEventLogCallback(ctx, durableTaskId, durableTaskInsertedAt, "wait:toggleable") + require.NoError(t, err) + assert.True(t, callback.IsSatisfied) + + _, err = repo.UpdateEventLogCallbackSatisfied(ctx, durableTaskId, durableTaskInsertedAt, "wait:toggleable", false) + require.NoError(t, err) + + callback, err = repo.GetEventLogCallback(ctx, durableTaskId, durableTaskInsertedAt, "wait:toggleable") + require.NoError(t, err) + assert.False(t, callback.IsSatisfied) +} + +func TestCreateMultipleEventLogFiles(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + task1Id, task1InsertedAt := newDurableTaskId() + task2Id, task2InsertedAt := newDurableTaskId() + + latestInsertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogFileOpts{ + { + DurableTaskId: task1Id, + DurableTaskInsertedAt: task1InsertedAt, + LatestInsertedAt: latestInsertedAt, + LatestNodeId: 10, + LatestBranchId: 1, + LatestBranchFirstParentNodeId: 5, + }, + { + DurableTaskId: task2Id, + DurableTaskInsertedAt: task2InsertedAt, + LatestInsertedAt: latestInsertedAt, + LatestNodeId: 20, + LatestBranchId: 2, + LatestBranchFirstParentNodeId: 15, + }, + } + + files, err := repo.CreateEventLogFiles(ctx, opts) + require.NoError(t, err) + require.Len(t, files, 2) + + file1, err := repo.GetEventLogFileForTask(ctx, task1Id, task1InsertedAt) + require.NoError(t, err) + assert.Equal(t, int64(10), file1.LatestNodeID) + + file2, err := repo.GetEventLogFileForTask(ctx, task2Id, task2InsertedAt) + require.NoError(t, err) + assert.Equal(t, int64(20), file2.LatestNodeID) +} + +// Tree structure test: entries on different branches share the same durable task +// but have different branch_ids, allowing for replay from specific points. +func TestEventLogEntriesBranching(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + }, + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_STARTED", + NodeId: 2, + ParentNodeId: 1, + BranchId: 0, + }, + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 3, + ParentNodeId: 1, + BranchId: 1, + }, + } + + _, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + + entries, err := repo.ListEventLogEntries(ctx, durableTaskId, durableTaskInsertedAt) + require.NoError(t, err) + require.Len(t, entries, 3) + + branch0Count := 0 + branch1Count := 0 + for _, e := range entries { + if e.BranchID == 0 { + branch0Count++ + } else if e.BranchID == 1 { + branch1Count++ + } + } + + assert.Equal(t, 2, branch0Count) + assert.Equal(t, 1, branch1Count) +} + +func TestEventLogEntriesIsolation(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + task1Id, task1InsertedAt := newDurableTaskId() + task2Id, task2InsertedAt := newDurableTaskId() + + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: task1Id, + DurableTaskInsertedAt: task1InsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + }, + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: task1Id, + DurableTaskInsertedAt: task1InsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_STARTED", + NodeId: 2, + ParentNodeId: 1, + BranchId: 0, + }, + } + _, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + + opts2 := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: task2Id, + DurableTaskInsertedAt: task2InsertedAt, + InsertedAt: insertedAt, + Kind: "MEMO_STARTED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + }, + } + _, err = repo.CreateEventLogEntries(ctx, opts2) + require.NoError(t, err) + + entries1, err := repo.ListEventLogEntries(ctx, task1Id, task1InsertedAt) + require.NoError(t, err) + assert.Len(t, entries1, 2) + + entries2, err := repo.ListEventLogEntries(ctx, task2Id, task2InsertedAt) + require.NoError(t, err) + assert.Len(t, entries2, 1) +} + +func TestEventLogCallbacksIsolation(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + task1Id, task1InsertedAt := newDurableTaskId() + task2Id, task2InsertedAt := newDurableTaskId() + + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + _, err := repo.CreateEventLogCallbacks(ctx, []CreateEventLogCallbackOpts{ + { + DurableTaskId: task1Id, + DurableTaskInsertedAt: task1InsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_COMPLETED", + Key: "shared_key_name", + NodeId: 1, + IsSatisfied: false, + }, + }) + require.NoError(t, err) + + _, err = repo.CreateEventLogCallbacks(ctx, []CreateEventLogCallbackOpts{ + { + DurableTaskId: task2Id, + DurableTaskInsertedAt: task2InsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_COMPLETED", + Key: "shared_key_name", + NodeId: 1, + IsSatisfied: true, + }, + }) + require.NoError(t, err) + + cb1, err := repo.GetEventLogCallback(ctx, task1Id, task1InsertedAt, "shared_key_name") + require.NoError(t, err) + assert.False(t, cb1.IsSatisfied) + + cb2, err := repo.GetEventLogCallback(ctx, task2Id, task2InsertedAt, "shared_key_name") + require.NoError(t, err) + assert.True(t, cb2.IsSatisfied) +} + +func TestEventLogEntryDuplicateNodeId(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + }, + } + + _, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + + // Attempt to insert duplicate node_id - should fail with primary key violation + duplicateOpts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_STARTED", + NodeId: 1, // same node_id + ParentNodeId: 0, + BranchId: 0, + }, + } + + _, err = repo.CreateEventLogEntries(ctx, duplicateOpts) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicate key") +} + +func TestEventLogEntryRootNode(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + // Root node has parent_node_id = 0 + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: 1, + ParentNodeId: 0, + BranchId: 0, + }, + } + + entries, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + require.Len(t, entries, 1) + + entry, err := repo.GetEventLogEntry(ctx, durableTaskId, durableTaskInsertedAt, 1) + require.NoError(t, err) + assert.Equal(t, int64(0), entry.ParentNodeID.Int64) +} + +func TestCallbackKeyUniqueness(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_COMPLETED", + Key: "unique_key", + NodeId: 1, + IsSatisfied: false, + }, + } + + _, err := repo.CreateEventLogCallbacks(ctx, opts) + require.NoError(t, err) + + // Attempt to insert duplicate key - should fail with primary key violation + duplicateOpts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_COMPLETED", + Key: "unique_key", // same key + NodeId: 2, + IsSatisfied: true, + }, + } + + _, err = repo.CreateEventLogCallbacks(ctx, duplicateOpts) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicate key") +} + +func TestConcurrentCallbackUpdates(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + opts := []CreateEventLogCallbackOpts{ + { + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "WAIT_FOR_COMPLETED", + Key: "concurrent_key", + NodeId: 1, + IsSatisfied: false, + }, + } + + _, err := repo.CreateEventLogCallbacks(ctx, opts) + require.NoError(t, err) + + const numGoroutines = 10 + var wg sync.WaitGroup + errChan := make(chan error, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(val bool) { + defer wg.Done() + _, err := repo.UpdateEventLogCallbackSatisfied(ctx, durableTaskId, durableTaskInsertedAt, "concurrent_key", val) + if err != nil { + errChan <- err + } + }(i%2 == 0) + } + + wg.Wait() + close(errChan) + + for err := range errChan { + require.NoError(t, err) + } + + // Verify callback still exists and has a valid state + callback, err := repo.GetEventLogCallback(ctx, durableTaskId, durableTaskInsertedAt, "concurrent_key") + require.NoError(t, err) + assert.NotNil(t, callback) +} + +func TestConcurrentEntryInserts(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + const numGoroutines = 10 + var wg sync.WaitGroup + errChan := make(chan error, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(nodeId int64) { + defer wg.Done() + opts := []CreateEventLogEntryOpts{ + { + TenantId: uuid.New(), + ExternalId: uuid.New(), + DurableTaskId: durableTaskId, + DurableTaskInsertedAt: durableTaskInsertedAt, + InsertedAt: insertedAt, + Kind: "RUN_TRIGGERED", + NodeId: nodeId, + ParentNodeId: 0, + BranchId: 0, + }, + } + _, err := repo.CreateEventLogEntries(ctx, opts) + if err != nil { + errChan <- err + } + }(int64(i + 1)) + } + + wg.Wait() + close(errChan) + + for err := range errChan { + require.NoError(t, err) + } + + entries, err := repo.ListEventLogEntries(ctx, durableTaskId, durableTaskInsertedAt) + require.NoError(t, err) + assert.Len(t, entries, numGoroutines) +} + +func TestEventLogBranchDiscovery(t *testing.T) { + pool, cleanup := setupPostgresWithMigration(t) + defer cleanup() + createDurableEventLogPartitions(t, pool) + + repo := createDurableEventsRepository(pool) + ctx := context.Background() + + durableTaskId, durableTaskInsertedAt := newDurableTaskId() + insertedAt := timestamptz(time.Now().UTC().Truncate(time.Microsecond)) + + // Create a tree: + // 1 (root, branch 0) + // ├── 2 (branch 0) + // │ └── 4 (branch 0) + // └── 3 (branch 1, first node of new branch from node 1) + // └── 5 (branch 1) + opts := []CreateEventLogEntryOpts{ + {TenantId: uuid.New(), ExternalId: uuid.New(), DurableTaskId: durableTaskId, DurableTaskInsertedAt: durableTaskInsertedAt, InsertedAt: insertedAt, Kind: "RUN_TRIGGERED", NodeId: 1, ParentNodeId: 0, BranchId: 0}, + {TenantId: uuid.New(), ExternalId: uuid.New(), DurableTaskId: durableTaskId, DurableTaskInsertedAt: durableTaskInsertedAt, InsertedAt: insertedAt, Kind: "WAIT_FOR_STARTED", NodeId: 2, ParentNodeId: 1, BranchId: 0}, + {TenantId: uuid.New(), ExternalId: uuid.New(), DurableTaskId: durableTaskId, DurableTaskInsertedAt: durableTaskInsertedAt, InsertedAt: insertedAt, Kind: "RUN_TRIGGERED", NodeId: 3, ParentNodeId: 1, BranchId: 1}, + {TenantId: uuid.New(), ExternalId: uuid.New(), DurableTaskId: durableTaskId, DurableTaskInsertedAt: durableTaskInsertedAt, InsertedAt: insertedAt, Kind: "MEMO_STARTED", NodeId: 4, ParentNodeId: 2, BranchId: 0}, + {TenantId: uuid.New(), ExternalId: uuid.New(), DurableTaskId: durableTaskId, DurableTaskInsertedAt: durableTaskInsertedAt, InsertedAt: insertedAt, Kind: "WAIT_FOR_STARTED", NodeId: 5, ParentNodeId: 3, BranchId: 1}, + } + + _, err := repo.CreateEventLogEntries(ctx, opts) + require.NoError(t, err) + + entries, err := repo.ListEventLogEntries(ctx, durableTaskId, durableTaskInsertedAt) + require.NoError(t, err) + require.Len(t, entries, 5) + + // Build a map for easy lookup + nodeMap := make(map[int64]*sqlcv1.V1DurableEventLogEntry) + for _, e := range entries { + nodeMap[e.NodeID] = e + } + + // Traverse branch 1: start from node 5, go to 3, then to 1 (root) + node5 := nodeMap[5] + assert.Equal(t, int64(1), node5.BranchID) + assert.Equal(t, int64(3), node5.ParentNodeID.Int64) + + node3 := nodeMap[3] + assert.Equal(t, int64(1), node3.BranchID) + assert.Equal(t, int64(1), node3.ParentNodeID.Int64) + + node1 := nodeMap[1] + assert.Equal(t, int64(0), node1.BranchID) + assert.Equal(t, int64(0), node1.ParentNodeID.Int64) + + // Traverse branch 0: node 4 -> 2 -> 1 + node4 := nodeMap[4] + assert.Equal(t, int64(0), node4.BranchID) + assert.Equal(t, int64(2), node4.ParentNodeID.Int64) + + node2 := nodeMap[2] + assert.Equal(t, int64(0), node2.BranchID) + assert.Equal(t, int64(1), node2.ParentNodeID.Int64) +}