-
Notifications
You must be signed in to change notification settings - Fork 263
Fix savepoint problems #392
Changes from 6 commits
faba237
324099e
35c0b26
59cf152
f3e73ca
7b48c46
84ae554
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,10 +98,11 @@ const ( | |
SavepointStateFailed = "Failed" | ||
SavepointStateSucceeded = "Succeeded" | ||
|
||
SavepointTriggerReasonUserRequested = "user requested" | ||
SavepointTriggerReasonScheduled = "scheduled" | ||
SavepointTriggerReasonJobCancel = "job cancel" | ||
SavepointTriggerReasonUpdate = "update" | ||
SavepointTriggerReasonUserRequested = "user requested" | ||
SavepointTriggerReasonScheduled = "scheduled" | ||
SavepointTriggerReasonScheduledInitial = "scheduled initial" // The first triggered savepoint has slightly different flow | ||
SavepointTriggerReasonJobCancel = "job cancel" | ||
SavepointTriggerReasonUpdate = "update" | ||
) | ||
|
||
// ImageSpec defines Flink image of JobManager and TaskManager containers. | ||
|
@@ -347,6 +348,9 @@ type JobSpec struct { | |
// Allow non-restored state, default: false. | ||
AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"` | ||
|
||
// Should take savepoint before upgrading the job, default: false. | ||
TakeSavepointOnUpgrade *bool `json:"takeSavepointOnUpgrade,omitempty"` | ||
|
||
// Savepoints dir where to store savepoints of the job. | ||
SavepointsDir *string `json:"savepointsDir,omitempty"` | ||
|
||
|
@@ -567,6 +571,9 @@ type JobStatus struct { | |
// Last savepoint trigger ID. | ||
LastSavepointTriggerID string `json:"lastSavepointTriggerID,omitempty"` | ||
|
||
// Last successful or failed savepoint operation timestamp. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the operation is still in progress? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll change the comment here. This flow is still not 100% bug proof, I think more work needs to be done on savepoint flow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then add more comments about the potential problems and TODOs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed the doc there |
||
LastSavepointTriggerTime string `json:"lastSavepointTriggerTime,omitempty"` | ||
|
||
// Last successful or failed savepoint operation timestamp. | ||
LastSavepointTime string `json:"lastSavepointTime,omitempty"` | ||
|
||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -468,31 +468,29 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { | |
var jobID = reconciler.getFlinkJobID() | ||
var restartPolicy = observed.cluster.Spec.Job.RestartPolicy | ||
var recordedJobStatus = observed.cluster.Status.Components.Job | ||
var jobSpec = reconciler.observed.cluster.Spec.Job | ||
|
||
// Update or recover Flink job by restart. | ||
var restartJob bool | ||
if shouldUpdateJob(observed) { | ||
log.Info("Job is about to be restarted to update") | ||
restartJob = true | ||
err := reconciler.restartJob(*jobSpec.TakeSavepointOnUpgrade) | ||
return requeueResult, err | ||
} else if shouldRestartJob(restartPolicy, recordedJobStatus) { | ||
log.Info("Job is about to be restarted to recover failure") | ||
restartJob = true | ||
} | ||
if restartJob { | ||
err := reconciler.restartJob() | ||
if err != nil { | ||
return requeueResult, err | ||
} | ||
return requeueResult, nil | ||
err := reconciler.restartJob(false) | ||
return requeueResult, err | ||
} | ||
Comment on lines
-473
to
482
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shashken @functicons There is no need to trigger savepoint here. This is because |
||
|
||
// Trigger savepoint if required. | ||
if len(jobID) > 0 { | ||
if ok, savepointTriggerReason := reconciler.shouldTakeSavepoint(); ok { | ||
newSavepointStatus, _ = reconciler.takeSavepointAsync(jobID, savepointTriggerReason) | ||
shouldTakeSavepont, savepointTriggerReason := reconciler.shouldTakeSavepoint() | ||
if shouldTakeSavepont { | ||
err = reconciler.updateSavepointTriggerTimeStatus() | ||
if err != nil { | ||
newSavepointStatus, _ = reconciler.takeSavepointAsync(jobID, savepointTriggerReason) | ||
} | ||
Comment on lines
+488
to
+491
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shashken @functicons And FlinkCluster status is updated in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Damm I tested the version b4 the CR change with that line, sorry about that |
||
} | ||
} | ||
|
||
log.Info("Job is not finished yet, no action", "jobID", jobID) | ||
return requeueResult, nil | ||
} | ||
|
@@ -575,14 +573,15 @@ func (reconciler *ClusterReconciler) getFlinkJobID() string { | |
return "" | ||
} | ||
|
||
func (reconciler *ClusterReconciler) restartJob() error { | ||
func (reconciler *ClusterReconciler) restartJob(shouldTakeSavepoint bool) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shashken @functicons |
||
var log = reconciler.log | ||
var observedJob = reconciler.observed.job | ||
var observedFlinkJob = reconciler.observed.flinkJobStatus.flinkJob | ||
|
||
log.Info("Stopping Flink job to restart", "", observedFlinkJob) | ||
shouldTakeSavepoint = shouldTakeSavepoint && canTakeSavepoint(*reconciler.observed.cluster) | ||
|
||
var err = reconciler.cancelRunningJobs(false /* takeSavepoint */) | ||
var err = reconciler.cancelRunningJobs(shouldTakeSavepoint /* takeSavepoint */) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -744,19 +743,31 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() (bool, string) { | |
return false, "" | ||
} | ||
|
||
var nextOkTriggerTime = getNextOkTime(jobStatus.LastSavepointTriggerTime, SavepointTimeoutSec) | ||
if time.Now().Before(nextOkTriggerTime) { | ||
return false, "" | ||
} | ||
|
||
// First savepoint. | ||
if len(jobStatus.LastSavepointTime) == 0 { | ||
return true, v1beta1.SavepointTriggerReasonScheduled | ||
return true, v1beta1.SavepointTriggerReasonScheduledInitial | ||
} | ||
|
||
// Interval expired. | ||
var tc = &TimeConverter{} | ||
var lastTime = tc.FromString(jobStatus.LastSavepointTime) | ||
var nextTime = lastTime.Add( | ||
time.Duration(int64(*jobSpec.AutoSavepointSeconds) * int64(time.Second))) | ||
// Scheduled, check if next trigger time arrived. | ||
var nextTime = getNextOkTime(jobStatus.LastSavepointTime, int64(*jobSpec.AutoSavepointSeconds)) | ||
return time.Now().After(nextTime), v1beta1.SavepointTriggerReasonScheduled | ||
} | ||
|
||
// Convert raw time to object and add `addedSeconds` to it | ||
func getNextOkTime(rawTime string, addedSeconds int64) time.Time { | ||
shashken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var tc = &TimeConverter{} | ||
var lastTriggerTime = time.Time{} | ||
if len(rawTime) != 0 { | ||
lastTriggerTime = tc.FromString(rawTime) | ||
} | ||
return lastTriggerTime.Add(time.Duration(addedSeconds * int64(time.Second))) | ||
} | ||
|
||
// Trigger savepoint for a job then return savepoint status to update. | ||
func (reconciler *ClusterReconciler) takeSavepointAsync(jobID string, triggerReason string) (*v1beta1.SavepointStatus, error) { | ||
var log = reconciler.log | ||
|
@@ -819,6 +830,14 @@ func (reconciler *ClusterReconciler) takeSavepoint( | |
return err | ||
} | ||
|
||
func (reconciler *ClusterReconciler) updateSavepointTriggerTimeStatus() error { | ||
var cluster = v1beta1.FlinkCluster{} | ||
reconciler.observed.cluster.DeepCopyInto(&cluster) | ||
var jobStatus = cluster.Status.Components.Job | ||
setTimestamp(&jobStatus.LastSavepointTriggerTime) | ||
return reconciler.k8sClient.Status().Update(reconciler.context, &cluster) | ||
} | ||
|
||
func (reconciler *ClusterReconciler) updateSavepointStatus( | ||
savepointStatus flinkclient.SavepointStatus) error { | ||
var cluster = v1beta1.FlinkCluster{} | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -42,7 +42,7 @@ const ( | |||||||||||||||
ControlRetries = "retries" | ||||||||||||||||
ControlMaxRetries = "3" | ||||||||||||||||
|
||||||||||||||||
SavepointTimeoutSec = 60 | ||||||||||||||||
SavepointTimeoutSec = 900 // 15 mins | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make it configurable as a field in the job spec? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might not be the best case, I increased it for the moment but I think we need to check the jobmanager's API to see SP status in the next SP PR. Do you think its bad we increased it to 15 mins? Is there a case where some1 will want to take SP every <15mins? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This constant is actually the "minimal interval for triggering 2 savepoints", right? The name "Timeout" could be confusing, it might be mis-interpreted as "the timeout for taking a savepoint (before considering it as a failure)". It is hard to determine the value. For example, I just took a savepoint 10 mins ago, but now I want to update my job, and I don't want to lose the state for the recent 10 mins, so I want it to take another savepoint before the update. Why do we need to introduce an arbitrary limit here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are three variables related to triggering savepoint.
In some cases, the savepoint may no longer proceed due to some errors, but the job manager may return the status normally. In that case, SavepointTimeoutSec is used to handle the timeout. For the jobs that require a long time to create savepoints, it would be better to change this variable to be user-configurable and set its default value large enough. flink-on-k8s-operator/controllers/flinkcluster_util.go Lines 45 to 51 in 17040f1
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found that it is possible to set the checkpoint timeout with the Flink configuration. In my opinion, it would be better to remove the Flink operator's savepoint timeout routine to resolve the second issue and guide related Flink configuration. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this might be better solved in another PR, this one provides a mitigation for clusters that SP takes more than a few seconds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM, let's address it in another PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the late response. When a checkpoint timeout occurs in Flink jobmanager, the savepoint state falls to "falied", so I don't think the first savepoint needs to be identified. The second issue is occurring because the default Flink checkpoint timeout is 10 minutes, but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which's the next PRs/issues? This one? #420 It seems to work as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||||||
|
||||||||||||||||
RevisionNameLabel = "flinkoperator.k8s.io/revision-name" | ||||||||||||||||
|
||||||||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.