Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/pdp/scheduler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (e *TaskEngine) pollerTryAllWork() bool {
var taskIDs []TaskID
for _, t := range tasks {
if h.TaskTypeDetails.RetryWait == nil ||
time.Since(t.UpdateTime) > h.TaskTypeDetails.RetryWait(0) {
t.Retries == 0 || // New task - pick up immediately
time.Since(t.UpdateTime) > h.TaskTypeDetails.RetryWait(int(t.Retries)) {
taskIDs = append(taskIDs, TaskID(t.ID))
}
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/pdp/scheduler/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,45 @@ func TestTaskEngineShutdownTimeoutMultipleTasks(t *testing.T) {
assert.Equal(t, numTasks, taskCount, "All tasks should have started")
taskCountMu.Unlock()
}

// TestTaskEngineNewTaskIgnoresRetryWait verifies that brand-new tasks (Retries=0)
// are picked up immediately, ignoring the RetryWait duration. This is a regression
// test for https://github.com/storacha/piri/issues/459
func TestTaskEngineNewTaskIgnoresRetryWait(t *testing.T) {
db := setupTestDB(t)

taskExecuted := make(chan struct{})

mockTask := NewMockTask("test_task", true)
// Set a very long RetryWait - if applied to new tasks, test would timeout
mockTask.typeDetails.RetryWait = func(retries int) time.Duration {
return 5 * time.Minute
}
mockTask.doFunc = func(taskID scheduler.TaskID) (bool, error) {
close(taskExecuted)
return true, nil
}

engine, err := scheduler.NewEngine(db, []scheduler.TaskInterface{mockTask})
require.NoError(t, err)
err = engine.Start(t.Context())
require.NoError(t, err)
t.Cleanup(func() {
if err := engine.Stop(context.Background()); err != nil {
t.Logf("failed to stop engine: %v", err)
}
})

mockTask.WaitForReady()
mockTask.AddTask(func(tID scheduler.TaskID, tx *gorm.DB) (bool, error) {
return true, nil
})

// Task should be picked up within 5 seconds, NOT 5 minutes
select {
case <-taskExecuted:
// Success - task was picked up immediately
case <-time.After(5 * time.Second):
t.Fatal("Task should be picked up immediately for new tasks (Retries=0), but it wasn't")
}
}
Loading