Skip to content
This repository was archived by the owner on Sep 2, 2022. It is now read-only.

Fix savepoint problems #392

Conversation

shashken
Copy link
Contributor

@shashken shashken commented Jan 11, 2021

I found 2 problems related to savepoints:

  1. When upgrading a job, there was no option to take a savepoint before upgrading (and using it to restore)
    added a flag to fix this case

  2. When a cluster starts it tries to take a savepoint, the savepoint status only updates once it completes, this creates a situation where a new savepoint gets triggered while the previous one is still running, and it keeps happening if your savepoints won't finish quickly (forever)
    I solved this with another value that holds the savepoint trigger time, and an increased savepoint timeout, so while a savepoint is still running a new one will not get triggered.

@functicons I'd love to get your feedback on this, we might want to create a stronger solution later on but for now savepoints are impossible to use with this operator if they take some time.

@functicons
Copy link
Collaborator

/gcbrun

@functicons functicons self-requested a review January 13, 2021 01:54
@functicons
Copy link
Collaborator

Thanks for the PR, will review as soon as I get a chance.

Copy link
Collaborator

@functicons functicons left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, thanks!

SavepointTriggerReasonUpdate = "update"
SavepointTriggerReasonUserRequested = "user requested"
SavepointTriggerReasonScheduled = "scheduled"
SavepointTriggerReasonScheduledInitial = "scheduledInitial"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about this reason.

@@ -347,6 +348,9 @@ type JobSpec struct {
// Allow non-restored state, default: false.
AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"`

// Should take savepoint before upgrading the job, default: false.
ShouldTakeSavepointOnUpgrade *bool `json:"shouldTakeSavepointOnUpgrade,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/ShouldTakeSavepointOnUpgrade/TakeSavepointOnUpdate.

Copy link
Contributor

@elanv elanv Feb 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashken It would be nice to consider setting a default value for takeSavepointOnUpgrade field. Otherwise it could lead to a nil pointer error like #408. You can easily set it with a marker like kubebuilder:default.

And considering the naming consistency, the field name TakeSavepointOnUpdate looks better to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashken @functicons And the default value is false in the comment. Wouldn't it be better to set it to true? It seems common to restore the job from the latest savepoint.

@@ -567,6 +571,9 @@ type JobStatus struct {
// Last savepoint trigger ID.
LastSavepointTriggerID string `json:"lastSavepointTriggerID,omitempty"`

// Last successful or failed savepoint operation timestamp.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the operation is still in progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the comment here. This flow is still not 100% bug proof, I think more work needs to be done on savepoint flow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then add more comments about the potential problems and TODOs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the doc there

if shouldUpdateJob(observed) {
log.Info("Job is about to be restarted to update")
restartJob = true
err := reconciler.restartJob(*jobSpec.ShouldTakeSavepointOnUpgrade)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer keeping the existing structure and introducing a variable takeSavepointOnUpdate for the 2 cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You sure? each case needs to pass a different argument to restartJob. I think its much cleaner like that

if ok, savepointTriggerReason := reconciler.shouldTakeSavepoint(); ok {
shouldTakeSavepont, savepointTriggerReason := reconciler.shouldTakeSavepoint()
if shouldTakeSavepont {
reconciler.updateSavepointTriggerTimeStatus()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method might return error.

@@ -42,7 +42,7 @@ const (
ControlRetries = "retries"
ControlMaxRetries = "3"

SavepointTimeoutSec = 60
SavepointTimeoutSec = 900 // 15 mins
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it configurable as a field in the job spec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be the best case, I increased it for the moment but I think we need to check the jobmanager's API to see SP status in the next SP PR. Do you think its bad we increased it to 15 mins? Is there a case where some1 will want to take SP every <15mins?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant is actually the "minimal interval for triggering 2 savepoints", right? The name "Timeout" could be confusing, it might be mis-interpreted as "the timeout for taking a savepoint (before considering it as a failure)".

It is hard to determine the value. For example, I just took a savepoint 10 mins ago, but now I want to update my job, and I don't want to lose the state for the recent 10 mins, so I want it to take another savepoint before the update. Why do we need to introduce an arbitrary limit here?

Copy link
Contributor

@elanv elanv Jan 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three variables related to triggering savepoint.

  • SavepointAgeForJobUpdateSec: savepoint age limit required for update progress
  • SavepointRequestRetryIntervalSec: retry interval for savepoint failure on update
  • SavepointTimeoutSec: savepoint timeout

In some cases, the savepoint may no longer proceed due to some errors, but the job manager may return the status normally. In that case, SavepointTimeoutSec is used to handle the timeout. For the jobs that require a long time to create savepoints, it would be better to change this variable to be user-configurable and set its default value large enough.

SavepointTimeoutSec = 60
RevisionNameLabel = "flinkoperator.k8s.io/revision-name"
// TODO: need to be user configurable
SavepointAgeForJobUpdateSec = 300
SavepointRequestRetryIntervalSec = 10

Copy link
Contributor

@elanv elanv Jan 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that it is possible to set the checkpoint timeout with the Flink configuration. In my opinion, it would be better to remove the Flink operator's savepoint timeout routine to resolve the second issue and guide related Flink configuration.

note: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#execution-checkpointing-timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be better solved in another PR, this one provides a mitigation for clusters that SP takes more than a few seconds.
I think we should discuss how we want to solve this in another issue and consider to get the info about the SP (is there an active one, was it timeout, etc..) from the jobmanager itself.
I can make this one a part of the crd for now and then later delete it when it will no longer be needed (in another PR)
WTYT @elanv @functicons

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, let's address it in another PR.

Copy link
Contributor

@elanv elanv Feb 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response. When a checkpoint timeout occurs in Flink jobmanager, the savepoint state falls to "falied", so I don't think the first savepoint needs to be identified. The second issue is occurring because the default Flink checkpoint timeout is 10 minutes, but SavepointTimeoutSec is less than that. I think it's okay to handle that part in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which's the next PRs/issues? This one? #420

It seems to work as the "minimal interval for triggering 2 savepoints" but some docs shows autoSavepointSeconds: 300 as an example value and I actually specify the value. Is this limitation a temporary workaround?

Copy link
Contributor

@elanv elanv Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SavepointTimeoutSec is just savepoint timeout and autoSavepointSeconds is the savepoint trigger interval as you mentioned. And #420 is the PR to improve savepoint routines including this issue.

@@ -2,7 +2,7 @@ apiVersion: v1
name: flink-operator
appVersion: "1.0"
description: A Helm chart for flink on Kubernetes operator
version: "0.2.0"
version: "0.2.5"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why upgrade to 5?

@@ -744,13 +743,21 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() (bool, string) {
return false, ""
}

var lastTriggerTime = time.Time{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract this block into a helper method and add comments.

@shashken shashken requested a review from functicons January 19, 2021 14:08
@@ -567,6 +571,9 @@ type JobStatus struct {
// Last savepoint trigger ID.
LastSavepointTriggerID string `json:"lastSavepointTriggerID,omitempty"`

// Last successful or failed savepoint operation timestamp.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then add more comments about the potential problems and TODOs.

@@ -42,7 +42,7 @@ const (
ControlRetries = "retries"
ControlMaxRetries = "3"

SavepointTimeoutSec = 60
SavepointTimeoutSec = 900 // 15 mins
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant is actually the "minimal interval for triggering 2 savepoints", right? The name "Timeout" could be confusing, it might be mis-interpreted as "the timeout for taking a savepoint (before considering it as a failure)".

It is hard to determine the value. For example, I just took a savepoint 10 mins ago, but now I want to update my job, and I don't want to lose the state for the recent 10 mins, so I want it to take another savepoint before the update. Why do we need to introduce an arbitrary limit here?

@shashken shashken requested a review from functicons January 25, 2021 17:24
@@ -42,7 +42,7 @@ const (
ControlRetries = "retries"
ControlMaxRetries = "3"

SavepointTimeoutSec = 60
SavepointTimeoutSec = 900 // 15 mins
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, let's address it in another PR.

@functicons
Copy link
Collaborator

/gcbrun

@functicons functicons merged commit 54a7d09 into GoogleCloudPlatform:master Feb 1, 2021
Comment on lines +488 to +491
err = reconciler.updateSavepointTriggerTimeStatus()
if err != nil {
newSavepointStatus, _ = reconciler.takeSavepointAsync(jobID, savepointTriggerReason)
}
Copy link
Contributor

@elanv elanv Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashken @functicons
It seems that savepoint should be triggered when err == nil.
When I tested, sometimes I found that the savepoint was not triggered and only status.job.lastSavepointTriggerTime was updated.

And FlinkCluster status is updated in updateStatus function of reconciler, therefore if you have a plan to make a new PR, it might be worth considering how to call the status update function once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damm I tested the version b4 the CR change with that line, sorry about that

Comment on lines -473 to 482
var restartJob bool
if shouldUpdateJob(observed) {
log.Info("Job is about to be restarted to update")
restartJob = true
err := reconciler.restartJob(*jobSpec.TakeSavepointOnUpgrade)
return requeueResult, err
} else if shouldRestartJob(restartPolicy, recordedJobStatus) {
log.Info("Job is about to be restarted to recover failure")
restartJob = true
}
if restartJob {
err := reconciler.restartJob()
if err != nil {
return requeueResult, err
}
return requeueResult, nil
err := reconciler.restartJob(false)
return requeueResult, err
}
Copy link
Contributor

@elanv elanv Feb 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashken @functicons There is no need to trigger savepoint here. This is because shouldUpdateJob checks if the latest savepoint exists and if it does not exit, savepoint will be triggered in other routine. restartJob is just for restarting the job so there is no need to trigger savepoint.

@@ -575,14 +573,15 @@ func (reconciler *ClusterReconciler) getFlinkJobID() string {
return ""
}

func (reconciler *ClusterReconciler) restartJob() error {
func (reconciler *ClusterReconciler) restartJob(shouldTakeSavepoint bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashken @functicons restartJob is just for restarting the job so there is no need to trigger savepoint here.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants