Skip to content

Commit df4824c

Browse files
committed
fix: skip RetryWait for new tasks in scheduler (#459)
The scheduler was applying RetryWait to all tasks, including brand-new ones with Retries=0. This caused every SendTransaction task to wait 5 minutes before being picked up, even though it had never been attempted. The fix checks Retries==0 to identify new tasks and picks them up immediately, only applying RetryWait to tasks that have actually failed and are being retried.
1 parent e502d8b commit df4824c

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

pkg/pdp/scheduler/engine.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ func (e *TaskEngine) pollerTryAllWork() bool {
197197
var taskIDs []TaskID
198198
for _, t := range tasks {
199199
if h.TaskTypeDetails.RetryWait == nil ||
200-
time.Since(t.UpdateTime) > h.TaskTypeDetails.RetryWait(0) {
200+
t.Retries == 0 || // New task - pick up immediately
201+
time.Since(t.UpdateTime) > h.TaskTypeDetails.RetryWait(int(t.Retries)) {
201202
taskIDs = append(taskIDs, TaskID(t.ID))
202203
}
203204
}

pkg/pdp/scheduler/engine_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,3 +789,45 @@ func TestTaskEngineShutdownTimeoutMultipleTasks(t *testing.T) {
789789
assert.Equal(t, numTasks, taskCount, "All tasks should have started")
790790
taskCountMu.Unlock()
791791
}
792+
793+
// TestTaskEngineNewTaskIgnoresRetryWait verifies that brand-new tasks (Retries=0)
794+
// are picked up immediately, ignoring the RetryWait duration. This is a regression
795+
// test for https://github.com/storacha/piri/issues/459
796+
func TestTaskEngineNewTaskIgnoresRetryWait(t *testing.T) {
797+
db := setupTestDB(t)
798+
799+
taskExecuted := make(chan struct{})
800+
801+
mockTask := NewMockTask("test_task", true)
802+
// Set a very long RetryWait - if applied to new tasks, test would timeout
803+
mockTask.typeDetails.RetryWait = func(retries int) time.Duration {
804+
return 5 * time.Minute
805+
}
806+
mockTask.doFunc = func(taskID scheduler.TaskID) (bool, error) {
807+
close(taskExecuted)
808+
return true, nil
809+
}
810+
811+
engine, err := scheduler.NewEngine(db, []scheduler.TaskInterface{mockTask})
812+
require.NoError(t, err)
813+
err = engine.Start(t.Context())
814+
require.NoError(t, err)
815+
t.Cleanup(func() {
816+
if err := engine.Stop(context.Background()); err != nil {
817+
t.Logf("failed to stop engine: %v", err)
818+
}
819+
})
820+
821+
mockTask.WaitForReady()
822+
mockTask.AddTask(func(tID scheduler.TaskID, tx *gorm.DB) (bool, error) {
823+
return true, nil
824+
})
825+
826+
// Task should be picked up within 5 seconds, NOT 5 minutes
827+
select {
828+
case <-taskExecuted:
829+
// Success - task was picked up immediately
830+
case <-time.After(5 * time.Second):
831+
t.Fatal("Task should be picked up immediately for new tasks (Retries=0), but it wasn't")
832+
}
833+
}

0 commit comments

Comments
 (0)