Skip to content

Commit 223bc61

Browse files
fix(scheduler): don't skip cron activations
Previous behavior for a long run: Cron activation: |-A-------A-------A------ ... Task execution: |-[EEEEEEEE]------[EEE]-- ... Fixed behavior: Cron activation: |-A-------A-------A------ ... Task execution: |-[EEEEEEEE][EEE]-[EEE]-- ... Previous behavior for retries: Cron activation: |-A-------A-------A----- ... Task execution/retry: |-[E]-[R]-[R]-----[EE]-- ... Fixed behavior: Cron activation: |-A-------A-------A----- ... Task execution/retry: |-[E]-[R]-[R][EE]-[EE]-- ... Fixes #4309
1 parent 9dc2b7e commit 223bc61

File tree

3 files changed

+195
-11
lines changed

3 files changed

+195
-11
lines changed

Diff for: pkg/scheduler/activation.go

+16
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,22 @@ type Activation[K comparable] struct {
2020
Retry int8
2121
Properties Properties
2222
Stop time.Time
23+
24+
// preRescheduleActivation keeps track of initial run activation time.
25+
// It is set only for run activations rescheduled by maintenance window
26+
// or retries. It helps to improve reschedule mechanism.
27+
// preRescheduleActivation should be accessed by InitialActivation.
28+
preRescheduleActivation time.Time
29+
}
30+
31+
// InitialActivation returns the activation time of the first run in task
32+
// execution sequence. Task execution sequence starts with the initial run
33+
// followed by reschedules triggered by maintenance window and/or retries.
34+
func InitialActivation[K comparable](a Activation[K]) time.Time {
35+
if a.preRescheduleActivation.IsZero() {
36+
return a.Time
37+
}
38+
return a.preRescheduleActivation
2339
}
2440

2541
// activationHeap implements heap.Interface.

Diff for: pkg/scheduler/scheduler.go

+51-11
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,10 @@ func (s *Scheduler[K]) Schedule(ctx context.Context, key K, d Details) {
113113
}
114114
next := d.Trigger.Next(now)
115115

116-
s.scheduleLocked(ctx, key, next, 0, nil, d.Window)
116+
s.scheduleLocked(ctx, key, next, time.Time{}, 0, nil, d.Window)
117117
}
118118

119-
func (s *Scheduler[K]) reschedule(ctx *RunContext[K]) {
119+
func (s *Scheduler[K]) reschedule(ctx *RunContext[K], initialActivation time.Time) {
120120
key := ctx.Key
121121

122122
s.mu.Lock()
@@ -143,29 +143,62 @@ func (s *Scheduler[K]) reschedule(ctx *RunContext[K]) {
143143
next := d.Trigger.Next(now)
144144

145145
var (
146-
retno int8
147-
p Properties
146+
retno int8
147+
p Properties
148+
preRescheduleActivation time.Time
148149
)
149150
switch {
150151
case shouldContinue(ctx):
151152
next = now
152153
retno = ctx.Retry
153154
p = ctx.Properties
155+
preRescheduleActivation = initialActivation
154156
case shouldRetry(ctx, ctx.err):
155157
if d.Backoff != nil {
156158
if b := d.Backoff.NextBackOff(); b != retry.Stop {
157159
next = now.Add(b)
158160
retno = ctx.Retry + 1
159161
p = ctx.Properties
160162
s.listener.OnRetryBackoff(ctx, key, b, retno)
163+
preRescheduleActivation = initialActivation
161164
}
162165
}
163166
default:
167+
// Use initial activation time instead of now so that we don't skip
168+
// activations of runs which ran longer than cron interval (#4309).
169+
//
170+
// Reschedule after long run:
171+
// Cron activation: |-A-------A-------A------ ...
172+
// Task execution: |-[EEEEEEEE][EEE]-[EEE]-- ...
173+
//
174+
// Reschedule after retries:
175+
// Cron activation: |-A-------A-------A----- ...
176+
// Task execution/retry: |-[E]-[R]-[R][EE]-[EE]-- ...
177+
//
178+
// Reschedule after maintenance window:
179+
// Cron activation: |-A-------A-------A----- ...
180+
// Maintenance window: |[WW]-[W]---[WWWWWWWWWWW ...
181+
// Task execution/continue: |-[E]-[C]---[C][E][E]--- ...
182+
//
183+
// Note that initial activation run is not calculated for runs interrupted
184+
// by pause/start and suspend/resume, as they are treated as fresh runs by
185+
// the scheduler.
186+
//
187+
// In general, if task execution takes more time than cron interval,
188+
// then the problem is on the cron definition side, but we should still
189+
// try to alleviate this issue for "spontaneous" long task executions.
190+
//
191+
// The +1 should ensure that next is strictly after activation time.
192+
// In case this assertion fails, fallback to the previous scheduling
193+
// mechanism which uses now for calculating next activation time.
194+
if a := d.Trigger.Next(initialActivation.Add(1)); a.After(initialActivation) {
195+
next = a
196+
}
164197
if d.Backoff != nil {
165198
d.Backoff.Reset()
166199
}
167200
}
168-
s.scheduleLocked(ctx, key, next, retno, p, d.Window)
201+
s.scheduleLocked(ctx, key, next, preRescheduleActivation, retno, p, d.Window)
169202
}
170203

171204
func shouldContinue(ctx context.Context) bool {
@@ -176,7 +209,7 @@ func shouldRetry(ctx context.Context, err error) bool {
176209
return !(err == nil || errors.Is(context.Cause(ctx), ErrStoppedTask) || retry.IsPermanent(err))
177210
}
178211

179-
func (s *Scheduler[K]) scheduleLocked(ctx context.Context, key K, next time.Time, retno int8, p Properties, w Window) {
212+
func (s *Scheduler[K]) scheduleLocked(ctx context.Context, key K, next, preRescheduleActivation time.Time, retno int8, p Properties, w Window) {
180213
if next.IsZero() {
181214
s.listener.OnNoTrigger(ctx, key)
182215
s.unscheduleLocked(key)
@@ -186,7 +219,14 @@ func (s *Scheduler[K]) scheduleLocked(ctx context.Context, key K, next time.Time
186219
begin, end := w.Next(next)
187220

188221
s.listener.OnSchedule(ctx, key, begin, end, retno)
189-
a := Activation[K]{Key: key, Time: begin, Retry: retno, Properties: p, Stop: end}
222+
a := Activation[K]{
223+
Time: begin,
224+
Key: key,
225+
Retry: retno,
226+
Properties: p,
227+
Stop: end,
228+
preRescheduleActivation: preRescheduleActivation,
229+
}
190230
if s.queue.Push(a) {
191231
s.wakeup()
192232
}
@@ -230,7 +270,7 @@ func (s *Scheduler[K]) Trigger(ctx context.Context, key K) bool {
230270

231271
s.listener.OnTrigger(ctx, key, ok)
232272
if ok {
233-
s.asyncRun(runCtx)
273+
s.asyncRun(runCtx, s.now())
234274
}
235275
return ok
236276
}
@@ -326,7 +366,7 @@ func (s *Scheduler[_]) Start(ctx context.Context) {
326366
runCtx := s.newRunContextLocked(a)
327367
s.mu.Unlock()
328368

329-
s.asyncRun(runCtx)
369+
s.asyncRun(runCtx, InitialActivation(a))
330370
}
331371

332372
s.listener.OnSchedulerStop(ctx)
@@ -393,14 +433,14 @@ func (s *Scheduler[K]) newRunContextLocked(a Activation[K]) *RunContext[K] {
393433
return ctx
394434
}
395435

396-
func (s *Scheduler[K]) asyncRun(ctx *RunContext[K]) {
436+
func (s *Scheduler[K]) asyncRun(ctx *RunContext[K], initialActivation time.Time) {
397437
s.listener.OnRunStart(ctx)
398438
s.wg.Add(1)
399439
go func(ctx *RunContext[K]) {
400440
defer s.wg.Done()
401441
ctx.err = s.run(*ctx)
402442
s.onRunEnd(ctx)
403-
s.reschedule(ctx)
443+
s.reschedule(ctx, initialActivation)
404444
}(ctx)
405445
}
406446

Diff for: pkg/scheduler/scheduler_test.go

+128
Original file line numberDiff line numberDiff line change
@@ -728,3 +728,131 @@ func TestWindowWithBackoff(t *testing.T) {
728728
}
729729
}
730730
}
731+
732+
// TestReschedule behavior described in (s *Scheduler[K]).reschedule comment.
733+
func TestReschedule(t *testing.T) {
734+
t.Run("after long run", func(t *testing.T) {
735+
// Tested behavior:
736+
// Cron activation: |-A-------A-------A------ ...
737+
// Task execution: |-[EEEEEEEE][EEE]-[EEE]-- ...
738+
ctx, cancel := context.WithCancel(context.Background())
739+
defer cancel()
740+
f := newFakeRunner()
741+
runDuration := []time.Duration{
742+
125 * time.Millisecond,
743+
50 * time.Millisecond,
744+
50 * time.Millisecond,
745+
}
746+
f.F = func(ctx testRunContext) error {
747+
time.Sleep(runDuration[f.Count()-1])
748+
return nil
749+
}
750+
s := NewScheduler[testKey](relativeTime(), f.Run, ll)
751+
k := randomKey()
752+
753+
s.Schedule(ctx, k, details(newFakeTrigger(100*time.Millisecond, 200*time.Millisecond, 300*time.Millisecond)))
754+
select {
755+
case <-startAndWait(ctx, s):
756+
t.Fatal("expected a run, scheduler exit")
757+
case <-f.Wait(time.Second):
758+
if c := f.Count(); c != len(runDuration) {
759+
t.Fatal("Run mismatch")
760+
}
761+
}
762+
})
763+
764+
t.Run("after retries", func(t *testing.T) {
765+
// Tested behavior:
766+
// Cron activation: |-A-------A-------A----- ...
767+
// Task execution/retry: |-[E]-[R]-[R][EE]-[EE]-- ...
768+
ctx, cancel := context.WithCancel(context.Background())
769+
defer cancel()
770+
f := newFakeRunner()
771+
runErr := []error{
772+
errors.New("a"),
773+
errors.New("b"),
774+
nil,
775+
nil,
776+
nil,
777+
}
778+
f.F = func(ctx testRunContext) error {
779+
return runErr[f.Count()-1]
780+
}
781+
s := NewScheduler[testKey](relativeTime(), f.Run, ll)
782+
k := randomKey()
783+
784+
d := details(newFakeTrigger(100*time.Millisecond, 200*time.Millisecond, 300*time.Millisecond))
785+
d.Backoff = retry.BackoffFunc(func() time.Duration {
786+
return 60 * time.Millisecond
787+
})
788+
d.Backoff = retry.WithMaxRetries(d.Backoff, 2)
789+
790+
s.Schedule(ctx, k, d)
791+
select {
792+
case <-startAndWait(ctx, s):
793+
t.Fatal("expected a run, scheduler exit")
794+
case <-f.Wait(time.Second):
795+
if c := f.Count(); c != len(runErr) {
796+
t.Fatal("Run mismatch")
797+
}
798+
}
799+
})
800+
801+
t.Run("after window", func(t *testing.T) {
802+
// Tested behavior:
803+
// Cron activation: |-A-------A-------A----- ...
804+
// Maintenance window: |[WW]-[W]---[WWWWWWWWWWW ...
805+
// Task execution/continue: |-[E]-[C]---[C][E][E]--- ...
806+
ctx, cancel := context.WithCancel(context.Background())
807+
defer cancel()
808+
runOutOfWindow := []bool{
809+
true,
810+
true,
811+
false,
812+
false,
813+
false,
814+
}
815+
f := newFakeRunner()
816+
f.F = func(ctx testRunContext) error {
817+
if runOutOfWindow[f.Count()-1] {
818+
<-ctx.Done()
819+
return ctx.err
820+
}
821+
return nil
822+
}
823+
now := timeutc.Now()
824+
s := NewScheduler[testKey](timeutc.Now, f.Run, ll)
825+
k := randomKey()
826+
827+
d := details(newFakeTriggerWithTime(
828+
now.Add(100*time.Millisecond),
829+
now.Add(200*time.Millisecond),
830+
now.Add(300*time.Millisecond),
831+
))
832+
wdt := func(d time.Duration) WeekdayTime {
833+
return WeekdayTime{
834+
Weekday: now.Weekday(),
835+
Time: now.Sub(now.Truncate(24*time.Hour)) + d,
836+
}
837+
}
838+
d.Window, _ = NewWindow(
839+
wdt(80*time.Millisecond), wdt(120*time.Millisecond),
840+
wdt(160*time.Millisecond), wdt(180*time.Millisecond),
841+
wdt(220*time.Millisecond), wdt(400*time.Millisecond),
842+
)
843+
d.Backoff = retry.BackoffFunc(func() time.Duration {
844+
return time.Millisecond
845+
})
846+
d.Backoff = retry.WithMaxRetries(d.Backoff, 2)
847+
848+
s.Schedule(ctx, k, d)
849+
select {
850+
case <-startAndWait(ctx, s):
851+
t.Fatal("expected a run, scheduler exit")
852+
case <-f.Wait(time.Second):
853+
if c := f.Count(); c != len(runOutOfWindow) {
854+
t.Fatal("Run mismatch")
855+
}
856+
}
857+
})
858+
}

0 commit comments

Comments
 (0)