Skip to content

Commit a6fe3f7

Browse files
feat: Use staggered schedule for CronJobs
This patch includes the following changes around the determined schedule for created (ReclaimSpace/KeyRotation)CronJobs: - Prevents thundering-herd behavior by staggering the schedule by a max offset of 2hrs. - If a job's schedule requires it to run more frequently, the stagger window is adjusted accordingly. IOW, if schedule < 2hrs; staggerWindow = schedule - The offset is calculated deterministically using UID to ensure the same offset b/w multiple reconciles. - If a job has missed its last execution, the stagger is not applied to let the job run immediately (might lead to thundering-herd but a fair tradeoff in happy path?) Signed-off-by: Niraj Yadav <niryadav@redhat.com>
1 parent 489e356 commit a6fe3f7

10 files changed

Lines changed: 217 additions & 10 deletions

api/csiaddons/v1alpha1/encryptionkeyrotationcronjob_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ type EncryptionKeyRotationJobTemplateSpec struct {
3636
// EncryptionKeyRotationCronJobSpec defines the desired state of EncryptionKeyRotationCronJob
3737
type EncryptionKeyRotationCronJobSpec struct {
3838
// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
39+
// A deterministic, UID-based stagger offset is applied to spread
40+
// execution across the "cronjob-stagger-window" (default: 2 hours,
41+
// set to 0 to disable) configured in the csi-addons-config ConfigMap.
3942
// +kubebuilder:validation:Required
4043
// +kubebuilder:validation:Pattern:=.+
4144
Schedule string `json:"schedule"`

api/csiaddons/v1alpha1/reclaimspacecronjob_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ type ReclaimSpaceJobTemplateSpec struct {
3636
// ReclaimSpaceCronJobSpec defines the desired state of ReclaimSpaceJob
3737
type ReclaimSpaceCronJobSpec struct {
3838
// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
39+
// A deterministic, UID-based stagger offset is applied to spread
40+
// execution across the "cronjob-stagger-window" (default: 2 hours,
41+
// set to 0 to disable) configured in the csi-addons-config ConfigMap.
3942
// +kubebuilder:validation:Required
4043
// +kubebuilder:validation:Pattern:=.+
4144
Schedule string `json:"schedule"`

config/crd/bases/csiaddons.openshift.io_encryptionkeyrotationcronjobs.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,11 @@ spec:
146146
- spec
147147
type: object
148148
schedule:
149-
description: The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
149+
description: |-
150+
The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
151+
A deterministic, UID-based stagger offset is applied to spread
152+
execution across the "cronjob-stagger-window" (default: 2 hours,
153+
set to 0 to disable) configured in the csi-addons-config ConfigMap.
150154
pattern: .+
151155
type: string
152156
startingDeadlineSeconds:

config/crd/bases/csiaddons.openshift.io_reclaimspacecronjobs.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,11 @@ spec:
145145
- spec
146146
type: object
147147
schedule:
148-
description: The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
148+
description: |-
149+
The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
150+
A deterministic, UID-based stagger offset is applied to spread
151+
execution across the "cronjob-stagger-window" (default: 2 hours,
152+
set to 0 to disable) configured in the csi-addons-config ConfigMap.
149153
pattern: .+
150154
type: string
151155
startingDeadlineSeconds:

deploy/controller/crds.yaml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,11 @@ spec:
304304
- spec
305305
type: object
306306
schedule:
307-
description: The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
307+
description: |-
308+
The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
309+
A deterministic, UID-based stagger offset is applied to spread
310+
execution across the "cronjob-stagger-window" (default: 2 hours,
311+
set to 0 to disable) configured in the csi-addons-config ConfigMap.
308312
pattern: .+
309313
type: string
310314
startingDeadlineSeconds:
@@ -1004,7 +1008,11 @@ spec:
10041008
- spec
10051009
type: object
10061010
schedule:
1007-
description: The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
1011+
description: |-
1012+
The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
1013+
A deterministic, UID-based stagger offset is applied to spread
1014+
execution across the "cronjob-stagger-window" (default: 2 hours,
1015+
set to 0 to disable) configured in the csi-addons-config ConfigMap.
10081016
pattern: .+
10091017
type: string
10101018
startingDeadlineSeconds:

internal/controller/csiaddons/encryptionkeyrotationcronjob_controller.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,11 @@ func getNextScheduleForKeyRotation(
313313
earliestTime = schedulingDeadline
314314
}
315315
}
316+
317+
rawNext := sched.Next(now)
318+
staggeredNext := utils.GetStaggeredNext(krcJob.UID, rawNext, sched)
316319
if earliestTime.After(now) {
317-
return time.Time{}, sched.Next(now), nil
320+
return time.Time{}, staggeredNext, nil
318321
}
319322

320323
starts := 0
@@ -333,7 +336,7 @@ func getNextScheduleForKeyRotation(
333336
" delete and recreate encryptionkeyrotationjob")
334337
}
335338
}
336-
return lastMissed, sched.Next(now), nil
339+
return lastMissed, staggeredNext, nil
337340
}
338341

339342
// constructEncryptionKeyRotationJob forms an EncryptionKeyRotationJob for a given

internal/controller/csiaddons/reclaimspacecronjob_controller.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,11 @@ func getNextSchedule(
376376
earliestTime = schedulingDeadline
377377
}
378378
}
379+
380+
rawNext := sched.Next(now)
381+
staggeredNext := utils.GetStaggeredNext(rsCronJob.UID, rawNext, sched)
379382
if earliestTime.After(now) {
380-
return time.Time{}, sched.Next(now), nil
383+
return time.Time{}, staggeredNext, nil
381384
}
382385

383386
starts := 0
@@ -396,5 +399,5 @@ func getNextSchedule(
396399
" delete and recreate reclaimspacecronjob")
397400
}
398401
}
399-
return lastMissed, sched.Next(now), nil
402+
return lastMissed, staggeredNext, nil
400403
}

internal/controller/csiaddons/reclaimspacecronjob_controller_test.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ import (
2222
"time"
2323

2424
csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
25+
"github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils"
26+
"github.com/robfig/cron/v3"
2527

2628
"github.com/stretchr/testify/assert"
2729
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/types"
2831
"k8s.io/utils/ptr"
2932
)
3033

@@ -113,6 +116,13 @@ func TestGetScheduledTimeForRSJob(t *testing.T) {
113116
func TestGetNextSchedule(t *testing.T) {
114117
now := time.Now()
115118
expectedLastMissed := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
119+
mustParse := func(sched string) cron.Schedule {
120+
s, err := cron.ParseStandard(sched)
121+
if err != nil {
122+
t.Fatalf("Failed to parse cron spec %q: %v", sched, err)
123+
}
124+
return s
125+
}
116126
type args struct {
117127
rsCronJob *csiaddonsv1alpha1.ReclaimSpaceCronJob
118128
now time.Time
@@ -128,6 +138,9 @@ func TestGetNextSchedule(t *testing.T) {
128138
name: "Valid schedule, no deadline, last schedule time exists",
129139
args: args{
130140
rsCronJob: &csiaddonsv1alpha1.ReclaimSpaceCronJob{
141+
ObjectMeta: metav1.ObjectMeta{
142+
UID: types.UID("valid-sched-1"),
143+
},
131144
Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{
132145
Schedule: "0 0 * * *",
133146
},
@@ -145,6 +158,9 @@ func TestGetNextSchedule(t *testing.T) {
145158
name: "Valid schedule, deadline not exceeded, last schedule time exists",
146159
args: args{
147160
rsCronJob: &csiaddonsv1alpha1.ReclaimSpaceCronJob{
161+
ObjectMeta: metav1.ObjectMeta{
162+
UID: types.UID("valid-sched-2"),
163+
},
148164
Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{
149165
Schedule: "0 0 * * *",
150166
StartingDeadlineSeconds: ptr.To(int64(3600)),
@@ -163,6 +179,9 @@ func TestGetNextSchedule(t *testing.T) {
163179
name: "Valid schedule, deadline exceeded, last schedule time exists, missed schedules < 100",
164180
args: args{
165181
rsCronJob: &csiaddonsv1alpha1.ReclaimSpaceCronJob{
182+
ObjectMeta: metav1.ObjectMeta{
183+
UID: types.UID("valid-sched-3"),
184+
},
166185
Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{
167186
Schedule: "*/1 * * * *",
168187
StartingDeadlineSeconds: ptr.To(int64(6000)),
@@ -181,6 +200,9 @@ func TestGetNextSchedule(t *testing.T) {
181200
name: "Valid schedule, deadline exceeded, last schedule time exists, missed schedules > 100",
182201
args: args{
183202
rsCronJob: &csiaddonsv1alpha1.ReclaimSpaceCronJob{
203+
ObjectMeta: metav1.ObjectMeta{
204+
UID: types.UID("valid-sched-4"),
205+
},
184206
Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{
185207
Schedule: "*/1 * * * *",
186208
StartingDeadlineSeconds: ptr.To(int64(6060)),
@@ -203,11 +225,19 @@ func TestGetNextSchedule(t *testing.T) {
203225
t.Errorf("getNextSchedule() error = %v, wantErr %v", err, tt.wantErr)
204226
return
205227
}
228+
229+
if tt.wantErr {
230+
return
231+
}
232+
206233
if !gotLastMissed.Equal(tt.lastMissed) && !gotLastMissed.Equal(time.Time{}) {
207234
t.Errorf("getNextSchedule() got last missed = %v, want %v", gotLastMissed, tt.lastMissed)
208235
}
209-
if !gotNextSchedule.Equal(tt.nextSchedule) {
210-
t.Errorf("getNextSchedule() got next schedule = %v, want %v", gotNextSchedule, tt.nextSchedule)
236+
237+
sched := mustParse(tt.args.rsCronJob.Spec.Schedule)
238+
staggered := utils.GetStaggeredNext(tt.args.rsCronJob.UID, tt.nextSchedule, sched)
239+
if !gotNextSchedule.Equal(staggered) {
240+
t.Errorf("getNextSchedule() got next schedule = %v, want %v", gotNextSchedule, staggered)
211241
}
212242
})
213243
}

internal/controller/utils/reclaimspace_keyrotation.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ package utils
1818

1919
import (
2020
"errors"
21+
"hash/fnv"
22+
"time"
2123

2224
csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
25+
"github.com/robfig/cron/v3"
2326

2427
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
apiTypes "k8s.io/apimachinery/pkg/types"
2529
"sigs.k8s.io/controller-runtime/pkg/client"
2630
)
2731

@@ -128,3 +132,33 @@ func ExtractOwnerNameFromPVCObj[T client.Object](rawObj client.Object) []string
128132

129133
return []string{owner.Name}
130134
}
135+
136+
// GetStaggeredNext returns a deterministic, UID-based staggered time computed from a schedule.
137+
// The staggered window is capped at a maximum of 2hrs but is adjusted for smaller intervals.
138+
func GetStaggeredNext(uid apiTypes.UID, nextTime time.Time, sched cron.Schedule) time.Time {
139+
// cron does not expose interval directly
140+
// We can determine it by subtracting two consecutive intervals
141+
afterNext := sched.Next(nextTime)
142+
interval := afterNext.Sub(nextTime)
143+
144+
// We do not want the stagger window to be any larger than a max of 2hrs
145+
const maxCap = 2 * time.Hour
146+
staggerWindow := min(interval, maxCap)
147+
148+
// To prevent the schedule from jumping in bw the reconciles,
149+
// we use the UID and hash it, this way it remains deterministic
150+
h := fnv.New32a()
151+
if _, err := h.Write([]byte(string(uid))); err != nil {
152+
return nextTime
153+
}
154+
hash := h.Sum32()
155+
156+
// Just a safety net
157+
stgrWindowSeconds := int64(staggerWindow.Seconds())
158+
if stgrWindowSeconds <= 0 {
159+
return nextTime
160+
}
161+
offsetSeconds := int64(hash) % stgrWindowSeconds // Modulo ensures it to be < stgrWindowSeconds
162+
163+
return nextTime.Add(time.Duration(offsetSeconds) * time.Second)
164+
}

internal/controller/utils/reclaimspace_keyrotation_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ package utils
1818

1919
import (
2020
"testing"
21+
"time"
2122

2223
csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
24+
"github.com/robfig/cron/v3"
25+
apitypes "k8s.io/apimachinery/pkg/types"
2326

2427
"sigs.k8s.io/controller-runtime/pkg/client"
2528
)
@@ -87,3 +90,115 @@ func TestSetSpec(t *testing.T) {
8790
})
8891
}
8992
}
93+
94+
func TestGetStaggeredNext(t *testing.T) {
95+
baseTime, _ := time.Parse(time.RFC3339, "2026-01-01T12:00:00Z")
96+
97+
mustParse := func(sched string) cron.Schedule {
98+
s, err := cron.ParseStandard(sched)
99+
if err != nil {
100+
t.Fatalf("Failed to parse cron spec %q: %v", sched, err)
101+
}
102+
return s
103+
}
104+
105+
tests := []struct {
106+
name string
107+
uid apitypes.UID
108+
schedule string
109+
baseNext time.Time
110+
maxDelta time.Duration // The staggered offset msut be less than this value
111+
}{
112+
{
113+
name: "Hourly schedule - capped at 1h interval",
114+
uid: apitypes.UID("job-hourly"),
115+
schedule: "@hourly",
116+
baseNext: baseTime,
117+
maxDelta: 1 * time.Hour,
118+
},
119+
{
120+
name: "Daily schedule - capped at 2h max cap",
121+
uid: apitypes.UID("job-daily"),
122+
schedule: "@daily",
123+
baseNext: baseTime,
124+
maxDelta: 2 * time.Hour,
125+
},
126+
{
127+
name: "1 Minute schedule - capped at 1m interval",
128+
uid: apitypes.UID("job-minute"),
129+
schedule: "* * * * *",
130+
baseNext: baseTime,
131+
maxDelta: 1 * time.Minute,
132+
},
133+
{
134+
name: "13 Minute schedule - capped at 13m interval",
135+
uid: apitypes.UID("job-13m"),
136+
schedule: "*/13 * * * *",
137+
baseNext: baseTime,
138+
maxDelta: 13 * time.Minute,
139+
},
140+
{
141+
// This last test case is special, avoid modifications
142+
name: "Determinism Check A (Run 1)", // Do not modify, used to check when to store the result
143+
uid: apitypes.UID("static-uid"), // Do not modify, used to assert determinisim below
144+
schedule: "@daily",
145+
baseNext: baseTime,
146+
maxDelta: 2 * time.Hour,
147+
},
148+
}
149+
150+
// Will store the result of the above special test case
151+
var deterministicResult time.Time
152+
var collisionResult time.Time
153+
154+
for i, tc := range tests {
155+
t.Run(tc.name, func(t *testing.T) {
156+
sched := mustParse(tc.schedule)
157+
got := GetStaggeredNext(tc.uid, tc.baseNext, sched)
158+
159+
// Staggered time must be >= original time
160+
if got.Before(tc.baseNext) {
161+
t.Errorf("Result %v is before base time %v", got, tc.baseNext)
162+
}
163+
164+
// Offset must be < min(cronInterval, maxCapof2hrs)
165+
offset := got.Sub(tc.baseNext)
166+
if offset >= tc.maxDelta {
167+
t.Errorf("Offset %v exceeds allowed max delta %v", offset, tc.maxDelta)
168+
}
169+
170+
// Store results of the last test case for determinism
171+
if tc.name == "Determinism Check A (Run 1)" {
172+
deterministicResult = got
173+
collisionResult = got
174+
}
175+
})
176+
177+
// to ensure the run after the above TCs
178+
if i == len(tests)-1 {
179+
t.Run("Determinism Check B (Run 2 - Same UID)", func(t *testing.T) {
180+
sched := mustParse("@daily")
181+
got := GetStaggeredNext("static-uid", baseTime, sched)
182+
183+
if !got.Equal(deterministicResult) {
184+
t.Errorf("Expected %v, got %v. Result not deterministic.", deterministicResult, got)
185+
}
186+
})
187+
188+
t.Run("Collision Check (Different UID)", func(t *testing.T) {
189+
sched := mustParse("@daily")
190+
got := GetStaggeredNext("different-uid", baseTime, sched)
191+
192+
// It is theoretically possible for hashes to collide, but highly unlikely
193+
// especially within a 2hr window
194+
if got.Equal(collisionResult) {
195+
t.Logf("Warning: Different UIDs resulted in exact same time %v. "+
196+
"This is rare but possible (hash collision).", got)
197+
} else {
198+
t.Logf("Verified different UIDs produced different offsets: %v vs %v",
199+
collisionResult.Sub(baseTime), got.Sub(baseTime))
200+
}
201+
})
202+
}
203+
}
204+
}

0 commit comments

Comments
 (0)