Skip to content

Commit 25d3e4a

Browse files
authored
Fix: restart job when policy FromSavepointOnFailure and no savepoint (#676)
1 parent e0a045d commit 25d3e4a

File tree

2 files changed

+4
-9
lines changed

2 files changed

+4
-9
lines changed

apis/flinkcluster/v1beta1/flinkcluster_types_util.go

+3-8
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,15 @@ func (j *JobStatus) IsSavepointUpToDate(spec *JobSpec, compareTime time.Time) bo
6464
}
6565

6666
// ShouldRestart returns true if the controller should restart failed job.
67-
// The controller can restart the job only if there is a savepoint that is close to the end time of the job.
67+
// The controller can restart the job if policy is set to FromSavepointOnFailure.
68+
// Job will restart from savepoint if the savepoint was taken successfully.
6869
func (j *JobStatus) ShouldRestart(spec *JobSpec) bool {
6970
if j == nil || !j.IsFailed() || spec == nil {
7071
return false
7172
}
7273

7374
restartEnabled := spec.RestartPolicy != nil && *spec.RestartPolicy == JobRestartPolicyFromSavepointOnFailure
74-
75-
var jobCompletionTime time.Time
76-
if j.CompletionTime != nil {
77-
jobCompletionTime = j.CompletionTime.Time
78-
}
79-
80-
return restartEnabled && j.IsSavepointUpToDate(spec, jobCompletionTime)
75+
return restartEnabled
8176
}
8277

8378
// UpdateReady returns true if job is ready to proceed update.

apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func TestShouldRestartJob(t *testing.T) {
124124
CompletionTime: &metav1.Time{Time: jobCompletionTime},
125125
}
126126
restart = jobStatus.ShouldRestart(&jobSpec)
127-
assert.Equal(t, restart, false)
127+
assert.Equal(t, restart, true)
128128

129129
// Not restart with restartPolicy Never
130130
jobSpec = JobSpec{

0 commit comments

Comments
 (0)