Skip to content

Commit 0b25e9c

Browse files
committed
Add support for Spec.PostCompleteDelay to set delay after job completion
Signed-off-by: Brad Davidson <[email protected]>
1 parent 938216d commit 0b25e9c

File tree

5 files changed

+62
-18
lines changed

5 files changed

+62
-18
lines changed

pkg/apis/upgrade.cattle.io/v1/types.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@ type PlanSpec struct {
4949

5050
Exclusive bool `json:"exclusive,omitempty"`
5151

52-
Window *TimeWindowSpec `json:"window,omitempty"`
53-
Prepare *ContainerSpec `json:"prepare,omitempty"`
54-
Cordon bool `json:"cordon,omitempty"`
55-
Drain *DrainSpec `json:"drain,omitempty"`
56-
Upgrade *ContainerSpec `json:"upgrade,omitempty" wrangler:"required"`
57-
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
52+
Window *TimeWindowSpec `json:"window,omitempty"`
53+
Prepare *ContainerSpec `json:"prepare,omitempty"`
54+
Cordon bool `json:"cordon,omitempty"`
55+
Drain *DrainSpec `json:"drain,omitempty"`
56+
Upgrade *ContainerSpec `json:"upgrade,omitempty" wrangler:"required"`
57+
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
58+
PostCompleteDelay *metav1.Duration `json:"postCompleteDelay,omitempty"`
5859
}
5960

6061
// PlanStatus represents the resulting state from processing Plan events.

pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/upgrade/handle_batch.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import (
77
"strconv"
88
"time"
99

10-
"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
1110
upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
1211
upgradejob "github.com/rancher/system-upgrade-controller/pkg/upgrade/job"
1312
batchctlv1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch/v1"
1413
"github.com/sirupsen/logrus"
1514
batchv1 "k8s.io/api/batch/v1"
15+
corev1 "k8s.io/api/core/v1"
1616
"k8s.io/apimachinery/pkg/api/errors"
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1818
"k8s.io/apimachinery/pkg/labels"
@@ -81,21 +81,47 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
8181
}
8282
// if the job has failed enqueue-or-delete it depending on the TTL window
8383
if upgradejob.ConditionFailed.IsTrue(obj) {
84-
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionFailed)
84+
failedTime := upgradejob.ConditionFailed.GetLastTransitionTime(obj)
85+
if failedTime.IsZero() {
86+
return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionFailed, "LastTransitionTime")
87+
}
88+
ctl.recorder.Eventf(plan, corev1.EventTypeWarning, "JobFailed", "Job failed on Node %s", node.Name)
89+
return obj, enqueueOrDelete(jobs, obj, failedTime)
8590
}
8691
// if the job has completed tag the node then enqueue-or-delete depending on the TTL window
8792
if upgradejob.ConditionComplete.IsTrue(obj) {
93+
completeTime := upgradejob.ConditionComplete.GetLastTransitionTime(obj)
94+
if completeTime.IsZero() {
95+
return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionComplete, "LastTransitionTime")
96+
}
8897
planLabel := upgradeapi.LabelPlanName(planName)
8998
if planHash, ok := obj.Labels[planLabel]; ok {
90-
node.Labels[planLabel] = planHash
99+
var delay time.Duration
100+
if plan.Spec.PostCompleteDelay != nil {
101+
delay = plan.Spec.PostCompleteDelay.Duration
102+
}
103+
// if the job has not been completed for the configured delay, re-enqueue
104+
// it for processing once the delay has elapsed.
105+
// the job's TTLSecondsAfterFinished is guaranteed to be set to a larger value
106+
// than the plan's requested delay.
107+
if interval := time.Now().Sub(completeTime); interval < delay {
108+
logrus.Debugf("Enqueing sync of Job %s/%s in %v", obj.Namespace, obj.Name, delay-interval)
109+
ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobCompleteWaiting", "Job completed on Node %s, waiting %s PostCompleteDelay", node.Name, delay)
110+
jobs.EnqueueAfter(obj.Namespace, obj.Name, delay-interval)
111+
} else {
112+
ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobComplete", "Job completed on Node %s", node.Name)
113+
node.Labels[planLabel] = planHash
114+
}
115+
// mark the node as schedulable even if the delay has not elapsed, so that
116+
// workloads can resume scheduling.
91117
if node.Spec.Unschedulable && (plan.Spec.Cordon || plan.Spec.Drain != nil) {
92118
node.Spec.Unschedulable = false
93119
}
94120
if node, err = nodes.Update(node); err != nil {
95121
return obj, err
96122
}
97123
}
98-
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionComplete)
124+
return obj, enqueueOrDelete(jobs, obj, completeTime)
99125
}
100126
// if the job is hasn't failed or completed but the job Node is not on the applying list, consider it running out-of-turn and delete it
101127
if i := sort.SearchStrings(plan.Status.Applying, nodeName); i == len(plan.Status.Applying) ||
@@ -108,12 +134,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
108134
return nil
109135
}
110136

111-
func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, done condition.Cond) error {
112-
lastTransitionTime := done.GetLastTransitionTime(job)
113-
if lastTransitionTime.IsZero() {
114-
return fmt.Errorf("condition %q missing field %q", done, "LastTransitionTime")
115-
}
116-
137+
func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, lastTransitionTime time.Time) error {
117138
var ttlSecondsAfterFinished time.Duration
118139

119140
if job.Spec.TTLSecondsAfterFinished == nil {

pkg/upgrade/job/job.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"slices"
66
"strconv"
77
"strings"
8+
"time"
89

910
"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
1011
upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
@@ -133,9 +134,21 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
133134
labelPlanName := upgradeapi.LabelPlanName(plan.Name)
134135
nodeHostname := upgradenode.Hostname(node)
135136
shortNodeName := strings.SplitN(node.Name, ".", 2)[0]
137+
ttlSecondsAfterFinished := TTLSecondsAfterFinished
138+
139+
// Ensure that the job's TTLSecondsAfterFinished is at least 1 minute longer than
140+
// the requested post-upgrade delay, so that the controller has time to see that
141+
// it has been completed for the requested duration.
142+
if delay := plan.Spec.PostCompleteDelay; delay != nil {
143+
ttlPostCompleteDelay := delay.Duration + time.Minute
144+
ttlAfterFinished := time.Duration(ttlSecondsAfterFinished) * time.Second
145+
if ttlAfterFinished < ttlPostCompleteDelay {
146+
ttlSecondsAfterFinished = int32(ttlPostCompleteDelay.Seconds())
147+
}
148+
}
136149

137150
jobAnnotations := labels.Set{
138-
upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10),
151+
upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(ttlSecondsAfterFinished), 10),
139152
}
140153
podAnnotations := labels.Set{}
141154

@@ -171,7 +184,7 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
171184
Spec: batchv1.JobSpec{
172185
PodReplacementPolicy: &PodReplacementPolicy,
173186
BackoffLimit: &BackoffLimit,
174-
TTLSecondsAfterFinished: &TTLSecondsAfterFinished,
187+
TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
175188
Template: corev1.PodTemplateSpec{
176189
ObjectMeta: metav1.ObjectMeta{
177190
Annotations: podAnnotations,

pkg/upgrade/plan/plan.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ var (
3535
ErrDrainDeleteConflict = fmt.Errorf("spec.drain cannot specify both deleteEmptydirData and deleteLocalData")
3636
ErrDrainPodSelectorNotSelectable = fmt.Errorf("spec.drain.podSelector is not selectable")
3737
ErrInvalidWindow = fmt.Errorf("spec.window is invalid")
38+
ErrInvalidDelay = fmt.Errorf("spec.postCompleteDelay is negative")
3839

3940
PollingInterval = func(defaultValue time.Duration) time.Duration {
4041
if str, ok := os.LookupEnv("SYSTEM_UPGRADE_PLAN_POLLING_INTERVAL"); ok {
@@ -257,5 +258,8 @@ func Validate(plan *upgradeapiv1.Plan) error {
257258
return ErrInvalidWindow
258259
}
259260
}
261+
if delay := plan.Spec.PostCompleteDelay; delay != nil && delay.Duration < 0 {
262+
return ErrInvalidDelay
263+
}
260264
return nil
261265
}

0 commit comments

Comments
 (0)