Skip to content

Commit 6018f0c

Browse files
config: make max stagger window configurable, default to 2hrs
This patch makes the stagger window for reclaimspace & keyrotation CronJobs configurable. The default value is 2hrs and it can be disabled by setting the value to: 0 Signed-off-by: Niraj Yadav <niryadav@redhat.com>
1 parent 7121b0e commit 6018f0c

11 files changed

Lines changed: 342 additions & 206 deletions

cmd/manager/main.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func main() {
107107
flag.BoolVar(&enableAuth, "enable-auth", true, "Enables TLS and adds bearer token to the headers (enabled by default)")
108108
flag.IntVar(&cfg.MaxGroupPVC, "max-group-pvc", cfg.MaxGroupPVC, "Maximum number of PVCs allowed in a volume group")
109109
flag.IntVar(&cfg.CSIAddonsNodeRetryDelay, util.CsiaddonsNodeRetryDelayKey, cfg.CSIAddonsNodeRetryDelay, "Duration, in seconds, the CSIAddonsNode reconciler must wait before retrying the connection to csi-addons sidecar")
110+
flag.IntVar(&cfg.KrRsCronJobStaggerWindow, util.KrRsCronJobStaggerWindowKey, cfg.KrRsCronJobStaggerWindow, "Duration, in hours, that the CronJobs for KeyRotation and ReclaimSpace are staggered within. Defaults to 2 hours, set as 0 to disable.")
110111
opts := zap.Options{
111112
Development: true,
112113
TimeEncoder: zapcore.ISO8601TimeEncoder,
@@ -234,8 +235,9 @@ func main() {
234235
os.Exit(1)
235236
}
236237
if err = (&controllers.ReclaimSpaceCronJobReconciler{
237-
Client: mgr.GetClient(),
238-
Scheme: mgr.GetScheme(),
238+
Client: mgr.GetClient(),
239+
Scheme: mgr.GetScheme(),
240+
StaggerWindow: cfg.KrRsCronJobStaggerWindow,
239241
}).SetupWithManager(mgr, ctrlOptions); err != nil {
240242
setupLog.Error(err, "unable to create controller", "controller", "ReclaimSpaceCronJob")
241243
os.Exit(1)
@@ -299,8 +301,9 @@ func main() {
299301
os.Exit(1)
300302
}
301303
if err = (&controllers.EncryptionKeyRotationCronJobReconciler{
302-
Client: mgr.GetClient(),
303-
Scheme: mgr.GetScheme(),
304+
Client: mgr.GetClient(),
305+
Scheme: mgr.GetScheme(),
306+
StaggerWindow: cfg.KrRsCronJobStaggerWindow,
304307
}).SetupWithManager(mgr, ctrlOptions); err != nil {
305308
setupLog.Error(err, "unable to create controller", "controller", "EncryptionKeyRotationCronJob")
306309
os.Exit(1)

deploy/controller/csi-addons-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ data:
1010
"max-concurrent-reconciles": "100"
1111
"max-group-pvcs": "100"
1212
"csi-addons-node-retry-delay": "5"
13+
"kr-rs-cronjob-stagger-window": "2"

docs/csi-addons-config.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ CSI-Addons Operator can consume configuration from a ConfigMap named `csi-addons
44
in the same namespace as the operator. This enables configuration of the operator to persist across
55
upgrades. The ConfigMap can support the following configuration options:
66

7-
| Option | Default value | Description |
8-
| ----------------------------- | ------------- | --------------------------------------------------------------------------------------------------- |
9-
| `reclaim-space-timeout` | `"3m"` | Timeout for reclaimspace operation |
10-
| `max-concurrent-reconciles` | `"100"` | Maximum number of concurrent reconciles |
11-
| `max-group-pvcs` | `"100"` | Maximum number of PVCs allowed in a volume group |
12-
| `csi-addons-node-retry-delay` | `"5"` | Duration, in seconds, that csi-addons reconcile must wait before retrying connection to the sidecar |
13-
| `schedule-precedence` | `"pvc"` | The order in which the schedule annotation should be read |
7+
| Option | Default value | Description |
8+
| ------------------------------ | ------------- | ------------------------------------------------------------------------------------------------------------------------ |
9+
| `reclaim-space-timeout` | `"3m"` | Timeout for reclaimspace operation |
10+
| `max-concurrent-reconciles` | `"100"` | Maximum number of concurrent reconciles |
11+
| `max-group-pvcs` | `"100"` | Maximum number of PVCs allowed in a volume group |
12+
| `csi-addons-node-retry-delay` | `"5"` | Duration, in seconds, that csi-addons reconcile must wait before retrying connection to the sidecar |
13+
| `schedule-precedence` | `"pvc"` | The order in which the schedule annotation should be read |
14+
| `kr-rs-cronjob-stagger-window` | `"2"` | Maximum stagger window, in hours, for key rotation and reclaim space CronJob schedules. Set to `0` to disable staggering |
1415

1516
[`csi-addons-config` ConfigMap](../deploy/controller/csi-addons-config.yaml) is provided as an example.
1617

internal/controller/csiaddons/encryptionkeyrotationcronjob_controller.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import (
4040
// EncryptionKeyRotationCronJobReconciler reconciles a EncryptionKeyRotationCronJob object
4141
type EncryptionKeyRotationCronJobReconciler struct {
4242
client.Client
43-
Scheme *runtime.Scheme
43+
Scheme *runtime.Scheme
44+
StaggerWindow int
4445
}
4546

4647
//+kubebuilder:rbac:groups=csiaddons.openshift.io,resources=encryptionkeyrotationcronjobs,verbs=get;list;watch;create;update;patch;delete
@@ -123,7 +124,7 @@ func (r *EncryptionKeyRotationCronJobReconciler) Reconcile(ctx context.Context,
123124
return ctrl.Result{}, nil
124125
}
125126

126-
missedRun, nextRun, err := getNextScheduleForKeyRotation(krcJob, time.Now())
127+
missedRun, nextRun, err := getNextScheduleForKeyRotation(krcJob, time.Now(), r.StaggerWindow)
127128
if err != nil {
128129
logger.Error(err, "failed to get next schedule for jobs", "schedule", krcJob.Spec.Schedule)
129130

@@ -293,7 +294,8 @@ func (r *EncryptionKeyRotationCronJobReconciler) deleteOldEncryptionKeyRotationJ
293294
// An error is returned if start is missed more than 100 times
294295
func getNextScheduleForKeyRotation(
295296
krcJob *csiaddonsv1alpha1.EncryptionKeyRotationCronJob,
296-
now time.Time) (time.Time, time.Time, error) {
297+
now time.Time,
298+
staggerWindow int) (time.Time, time.Time, error) {
297299
sched, err := cron.ParseStandard(krcJob.Spec.Schedule)
298300
if err != nil {
299301
return time.Time{}, time.Time{}, fmt.Errorf("unparsable schedule %q: %v", krcJob.Spec.Schedule, err)
@@ -315,7 +317,7 @@ func getNextScheduleForKeyRotation(
315317
}
316318

317319
rawNext := sched.Next(now)
318-
staggeredNext := utils.GetStaggeredNext(krcJob.UID, rawNext, sched)
320+
staggeredNext := utils.GetStaggeredNext(krcJob.UID, rawNext, sched, staggerWindow)
319321
if earliestTime.After(now) {
320322
return time.Time{}, staggeredNext, nil
321323
}

internal/controller/csiaddons/reclaimspacecronjob_controller.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import (
4040
// ReclaimSpaceCronJobReconciler reconciles a ReclaimSpaceCronJob object
4141
type ReclaimSpaceCronJobReconciler struct {
4242
client.Client
43-
Scheme *runtime.Scheme
43+
Scheme *runtime.Scheme
44+
StaggerWindow int
4445
}
4546

4647
var (
@@ -129,7 +130,7 @@ func (r *ReclaimSpaceCronJobReconciler) Reconcile(ctx context.Context, req ctrl.
129130
}
130131

131132
// figure out the next times that we need to create jobs at (or anything we missed).
132-
missedRun, nextRun, err := getNextSchedule(rsCronJob, time.Now())
133+
missedRun, nextRun, err := getNextSchedule(rsCronJob, time.Now(), r.StaggerWindow)
133134
if err != nil {
134135
logger.Error(err, "Failed to Parse out CronJob schedule", "schedule", rsCronJob.Spec.Schedule)
135136
// invalid schedule, do not requeue.
@@ -356,7 +357,8 @@ func getScheduledTimeForRSJob(rsJob *csiaddonsv1alpha1.ReclaimSpaceJob) (*time.T
356357
// This function returns error if there are more than 100 missed start times.
357358
func getNextSchedule(
358359
rsCronJob *csiaddonsv1alpha1.ReclaimSpaceCronJob,
359-
now time.Time) (time.Time, time.Time, error) {
360+
now time.Time,
361+
staggerWindow int) (time.Time, time.Time, error) {
360362
sched, err := cron.ParseStandard(rsCronJob.Spec.Schedule)
361363
if err != nil {
362364
return time.Time{}, time.Time{}, fmt.Errorf("unparsable schedule %q: %v", rsCronJob.Spec.Schedule, err)
@@ -378,7 +380,7 @@ func getNextSchedule(
378380
}
379381

380382
rawNext := sched.Next(now)
381-
staggeredNext := utils.GetStaggeredNext(rsCronJob.UID, rawNext, sched)
383+
staggeredNext := utils.GetStaggeredNext(rsCronJob.UID, rawNext, sched, staggerWindow)
382384
if earliestTime.After(now) {
383385
return time.Time{}, staggeredNext, nil
384386
}

internal/controller/csiaddons/reclaimspacecronjob_controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func TestGetNextSchedule(t *testing.T) {
220220
}
221221
for _, tt := range tests {
222222
t.Run(tt.name, func(t *testing.T) {
223-
gotLastMissed, gotNextSchedule, err := getNextSchedule(tt.args.rsCronJob, tt.args.now)
223+
gotLastMissed, gotNextSchedule, err := getNextSchedule(tt.args.rsCronJob, tt.args.now, 2)
224224
if (err != nil) != tt.wantErr {
225225
t.Errorf("getNextSchedule() error = %v, wantErr %v", err, tt.wantErr)
226226
return
@@ -235,7 +235,7 @@ func TestGetNextSchedule(t *testing.T) {
235235
}
236236

237237
sched := mustParse(tt.args.rsCronJob.Spec.Schedule)
238-
staggered := utils.GetStaggeredNext(tt.args.rsCronJob.UID, tt.nextSchedule, sched)
238+
staggered := utils.GetStaggeredNext(tt.args.rsCronJob.UID, tt.nextSchedule, sched, 2)
239239
if !gotNextSchedule.Equal(staggered) {
240240
t.Errorf("getNextSchedule() got next schedule = %v, want %v", gotNextSchedule, staggered)
241241
}

internal/controller/utils/reclaimspace_keyrotation.go

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@ package utils
1818

1919
import (
2020
"errors"
21-
"hash/fnv"
22-
"time"
2321

2422
csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
25-
"github.com/robfig/cron/v3"
2623

2724
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28-
apiTypes "k8s.io/apimachinery/pkg/types"
2925
"sigs.k8s.io/controller-runtime/pkg/client"
3026
)
3127

@@ -103,17 +99,6 @@ func SetSpec(obj client.Object, schedule, pvcName string) {
10399
}
104100
}
105101

106-
func GetSchedule(obj client.Object) string {
107-
switch v := obj.(type) {
108-
case *csiaddonsv1alpha1.EncryptionKeyRotationCronJob:
109-
return v.Spec.Schedule
110-
case *csiaddonsv1alpha1.ReclaimSpaceCronJob:
111-
return v.Spec.Schedule
112-
default:
113-
return ""
114-
}
115-
}
116-
117102
// ExtractOwnerNameFromPVCObj extracts owner.Name from the object if it is
118103
// of type `T` and has a PVC as its owner.
119104
func ExtractOwnerNameFromPVCObj[T client.Object](rawObj client.Object) []string {
@@ -132,33 +117,3 @@ func ExtractOwnerNameFromPVCObj[T client.Object](rawObj client.Object) []string
132117

133118
return []string{owner.Name}
134119
}
135-
136-
// GetStaggeredNext returns a deterministic, UID-based staggered time computed from a schedule.
137-
// The staggered window is capped at a maximum of 2hrs but is adjusted for smaller intervals.
138-
func GetStaggeredNext(uid apiTypes.UID, nextTime time.Time, sched cron.Schedule) time.Time {
139-
// cron does not expose interval directly
140-
// We can determine it by subtracting two consecutive intervals
141-
afterNext := sched.Next(nextTime)
142-
interval := afterNext.Sub(nextTime)
143-
144-
// We do not want the stagger window to be any larger than a max of 2hrs
145-
const maxCap = 2 * time.Hour
146-
staggerWindow := min(interval, maxCap)
147-
148-
// To prevent the schedule from jumping in bw the reconciles,
149-
// we use the UID and hash it, this way it remains deterministic
150-
h := fnv.New32a()
151-
if _, err := h.Write([]byte(string(uid))); err != nil {
152-
return nextTime
153-
}
154-
hash := h.Sum32()
155-
156-
// Just a safety net
157-
stgrWindowSeconds := int64(staggerWindow.Seconds())
158-
if stgrWindowSeconds <= 0 {
159-
return nextTime
160-
}
161-
offsetSeconds := int64(hash) % stgrWindowSeconds // Modulo ensures it to be < stgrWindowSeconds
162-
163-
return nextTime.Add(time.Duration(offsetSeconds) * time.Second)
164-
}

internal/controller/utils/reclaimspace_keyrotation_test.go

Lines changed: 0 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@ package utils
1818

1919
import (
2020
"testing"
21-
"time"
2221

2322
csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
24-
"github.com/robfig/cron/v3"
25-
apitypes "k8s.io/apimachinery/pkg/types"
2623

2724
"sigs.k8s.io/controller-runtime/pkg/client"
2825
)
@@ -90,115 +87,3 @@ func TestSetSpec(t *testing.T) {
9087
})
9188
}
9289
}
93-
94-
func TestGetStaggeredNext(t *testing.T) {
95-
baseTime, _ := time.Parse(time.RFC3339, "2026-01-01T12:00:00Z")
96-
97-
mustParse := func(sched string) cron.Schedule {
98-
s, err := cron.ParseStandard(sched)
99-
if err != nil {
100-
t.Fatalf("Failed to parse cron spec %q: %v", sched, err)
101-
}
102-
return s
103-
}
104-
105-
tests := []struct {
106-
name string
107-
uid apitypes.UID
108-
schedule string
109-
baseNext time.Time
110-
maxDelta time.Duration // The staggered offset msut be less than this value
111-
}{
112-
{
113-
name: "Hourly schedule - capped at 1h interval",
114-
uid: apitypes.UID("job-hourly"),
115-
schedule: "@hourly",
116-
baseNext: baseTime,
117-
maxDelta: 1 * time.Hour,
118-
},
119-
{
120-
name: "Daily schedule - capped at 2h max cap",
121-
uid: apitypes.UID("job-daily"),
122-
schedule: "@daily",
123-
baseNext: baseTime,
124-
maxDelta: 2 * time.Hour,
125-
},
126-
{
127-
name: "1 Minute schedule - capped at 1m interval",
128-
uid: apitypes.UID("job-minute"),
129-
schedule: "* * * * *",
130-
baseNext: baseTime,
131-
maxDelta: 1 * time.Minute,
132-
},
133-
{
134-
name: "13 Minute schedule - capped at 13m interval",
135-
uid: apitypes.UID("job-13m"),
136-
schedule: "*/13 * * * *",
137-
baseNext: baseTime,
138-
maxDelta: 13 * time.Minute,
139-
},
140-
{
141-
// This last test case is special, avoid modifications
142-
name: "Determinism Check A (Run 1)", // Do not modify, used to check when to store the result
143-
uid: apitypes.UID("static-uid"), // Do not modify, used to assert determinisim below
144-
schedule: "@daily",
145-
baseNext: baseTime,
146-
maxDelta: 2 * time.Hour,
147-
},
148-
}
149-
150-
// Will store the result of the above special test case
151-
var deterministicResult time.Time
152-
var collisionResult time.Time
153-
154-
for i, tc := range tests {
155-
t.Run(tc.name, func(t *testing.T) {
156-
sched := mustParse(tc.schedule)
157-
got := GetStaggeredNext(tc.uid, tc.baseNext, sched)
158-
159-
// Staggered time must be >= original time
160-
if got.Before(tc.baseNext) {
161-
t.Errorf("Result %v is before base time %v", got, tc.baseNext)
162-
}
163-
164-
// Offset must be < min(cronInterval, maxCapof2hrs)
165-
offset := got.Sub(tc.baseNext)
166-
if offset >= tc.maxDelta {
167-
t.Errorf("Offset %v exceeds allowed max delta %v", offset, tc.maxDelta)
168-
}
169-
170-
// Store results of the last test case for determinism
171-
if tc.name == "Determinism Check A (Run 1)" {
172-
deterministicResult = got
173-
collisionResult = got
174-
}
175-
})
176-
177-
// to ensure the run after the above TCs
178-
if i == len(tests)-1 {
179-
t.Run("Determinism Check B (Run 2 - Same UID)", func(t *testing.T) {
180-
sched := mustParse("@daily")
181-
got := GetStaggeredNext("static-uid", baseTime, sched)
182-
183-
if !got.Equal(deterministicResult) {
184-
t.Errorf("Expected %v, got %v. Result not deterministic.", deterministicResult, got)
185-
}
186-
})
187-
188-
t.Run("Collision Check (Different UID)", func(t *testing.T) {
189-
sched := mustParse("@daily")
190-
got := GetStaggeredNext("different-uid", baseTime, sched)
191-
192-
// It is theoretically possible for hashes to collide, but highly unlikely
193-
// especially within a 2hr window
194-
if got.Equal(collisionResult) {
195-
t.Logf("Warning: Different UIDs resulted in exact same time %v. "+
196-
"This is rare but possible (hash collision).", got)
197-
} else {
198-
t.Logf("Verified different UIDs produced different offsets: %v vs %v",
199-
collisionResult.Sub(baseTime), got.Sub(baseTime))
200-
}
201-
})
202-
}
203-
}
204-
}

0 commit comments

Comments
 (0)