Skip to content
This repository was archived by the owner on Sep 2, 2022. It is now read-only.

Commit 54a7d09

Browse files
authored
Fix savepoint problems (#392)
1 parent 17040f1 commit 54a7d09

File tree

9 files changed

+105
-57
lines changed

9 files changed

+105
-57
lines changed

api/v1beta1/flinkcluster_types.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,11 @@ const (
9898
SavepointStateFailed = "Failed"
9999
SavepointStateSucceeded = "Succeeded"
100100

101-
SavepointTriggerReasonUserRequested = "user requested"
102-
SavepointTriggerReasonScheduled = "scheduled"
103-
SavepointTriggerReasonJobCancel = "job cancel"
104-
SavepointTriggerReasonUpdate = "update"
101+
SavepointTriggerReasonUserRequested = "user requested"
102+
SavepointTriggerReasonScheduled = "scheduled"
103+
SavepointTriggerReasonScheduledInitial = "scheduled initial" // The first triggered savepoint has slightly different flow
104+
SavepointTriggerReasonJobCancel = "job cancel"
105+
SavepointTriggerReasonUpdate = "update"
105106
)
106107

107108
// ImageSpec defines Flink image of JobManager and TaskManager containers.
@@ -347,6 +348,9 @@ type JobSpec struct {
347348
// Allow non-restored state, default: false.
348349
AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"`
349350

351+
// Should take savepoint before upgrading the job, default: false.
352+
TakeSavepointOnUpgrade *bool `json:"takeSavepointOnUpgrade,omitempty"`
353+
350354
// Savepoints dir where to store savepoints of the job.
351355
SavepointsDir *string `json:"savepointsDir,omitempty"`
352356

@@ -570,6 +574,10 @@ type JobStatus struct {
570574
// Last savepoint trigger ID.
571575
LastSavepointTriggerID string `json:"lastSavepointTriggerID,omitempty"`
572576

577+
// Last savepoint trigger time. This is updated to make sure multiple
578+
// savepoints will not be taken simultaneously.
579+
LastSavepointTriggerTime string `json:"lastSavepointTriggerTime,omitempty"`
580+
573581
// Last successful or failed savepoint operation timestamp.
574582
LastSavepointTime string `json:"lastSavepointTime,omitempty"`
575583

api/v1beta1/zz_generated.deepcopy.go

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ spec:
163163
type: integer
164164
cancelRequested:
165165
type: boolean
166+
takeSavepointOnUpgrade:
167+
type: boolean
166168
className:
167169
type: string
168170
cleanupPolicy:
@@ -5148,6 +5150,8 @@ spec:
51485150
type: string
51495151
id:
51505152
type: string
5153+
lastSavepointTriggerTime:
5154+
type: string
51515155
lastSavepointTime:
51525156
type: string
51535157
lastSavepointTriggerID:

controllers/flinkcluster_reconciler.go

+41-21
Original file line numberDiff line numberDiff line change
@@ -503,31 +503,29 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
503503
var jobID = reconciler.getFlinkJobID()
504504
var restartPolicy = observed.cluster.Spec.Job.RestartPolicy
505505
var recordedJobStatus = observed.cluster.Status.Components.Job
506+
var jobSpec = reconciler.observed.cluster.Spec.Job
506507

507508
// Update or recover Flink job by restart.
508-
var restartJob bool
509509
if shouldUpdateJob(observed) {
510510
log.Info("Job is about to be restarted to update")
511-
restartJob = true
511+
err := reconciler.restartJob(*jobSpec.TakeSavepointOnUpgrade)
512+
return requeueResult, err
512513
} else if shouldRestartJob(restartPolicy, recordedJobStatus) {
513514
log.Info("Job is about to be restarted to recover failure")
514-
restartJob = true
515-
}
516-
if restartJob {
517-
err := reconciler.restartJob()
518-
if err != nil {
519-
return requeueResult, err
520-
}
521-
return requeueResult, nil
515+
err := reconciler.restartJob(false)
516+
return requeueResult, err
522517
}
523518

524519
// Trigger savepoint if required.
525520
if len(jobID) > 0 {
526-
if ok, savepointTriggerReason := reconciler.shouldTakeSavepoint(); ok {
527-
newSavepointStatus, _ = reconciler.takeSavepointAsync(jobID, savepointTriggerReason)
521+
shouldTakeSavepont, savepointTriggerReason := reconciler.shouldTakeSavepoint()
522+
if shouldTakeSavepont {
523+
err = reconciler.updateSavepointTriggerTimeStatus()
524+
if err != nil {
525+
newSavepointStatus, _ = reconciler.takeSavepointAsync(jobID, savepointTriggerReason)
526+
}
528527
}
529528
}
530-
531529
log.Info("Job is not finished yet, no action", "jobID", jobID)
532530
return requeueResult, nil
533531
}
@@ -610,14 +608,15 @@ func (reconciler *ClusterReconciler) getFlinkJobID() string {
610608
return ""
611609
}
612610

613-
func (reconciler *ClusterReconciler) restartJob() error {
611+
func (reconciler *ClusterReconciler) restartJob(shouldTakeSavepoint bool) error {
614612
var log = reconciler.log
615613
var observedJob = reconciler.observed.job
616614
var observedFlinkJob = reconciler.observed.flinkJobStatus.flinkJob
617615

618616
log.Info("Stopping Flink job to restart", "", observedFlinkJob)
617+
shouldTakeSavepoint = shouldTakeSavepoint && canTakeSavepoint(*reconciler.observed.cluster)
619618

620-
var err = reconciler.cancelRunningJobs(false /* takeSavepoint */)
619+
var err = reconciler.cancelRunningJobs(shouldTakeSavepoint /* takeSavepoint */)
621620
if err != nil {
622621
return err
623622
}
@@ -779,19 +778,32 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() (bool, string) {
779778
return false, ""
780779
}
781780

781+
var nextOkTriggerTime = getTimeAfterAddedSeconds(jobStatus.LastSavepointTriggerTime, SavepointTimeoutSec)
782+
if time.Now().Before(nextOkTriggerTime) {
783+
return false, ""
784+
}
785+
782786
// First savepoint.
783787
if len(jobStatus.LastSavepointTime) == 0 {
784-
return true, v1beta1.SavepointTriggerReasonScheduled
788+
return true, v1beta1.SavepointTriggerReasonScheduledInitial
785789
}
786790

787-
// Interval expired.
788-
var tc = &TimeConverter{}
789-
var lastTime = tc.FromString(jobStatus.LastSavepointTime)
790-
var nextTime = lastTime.Add(
791-
time.Duration(int64(*jobSpec.AutoSavepointSeconds) * int64(time.Second)))
791+
// Scheduled, check if next trigger time arrived.
792+
var nextTime = getTimeAfterAddedSeconds(jobStatus.LastSavepointTime, int64(*jobSpec.AutoSavepointSeconds))
792793
return time.Now().After(nextTime), v1beta1.SavepointTriggerReasonScheduled
793794
}
794795

796+
// Convert raw time to object and add `addedSeconds` to it,
797+
// getting a time object for the parsed `rawTime` with `addedSeconds` added to it.
798+
func getTimeAfterAddedSeconds(rawTime string, addedSeconds int64) time.Time {
799+
var tc = &TimeConverter{}
800+
var lastTriggerTime = time.Time{}
801+
if len(rawTime) != 0 {
802+
lastTriggerTime = tc.FromString(rawTime)
803+
}
804+
return lastTriggerTime.Add(time.Duration(addedSeconds * int64(time.Second)))
805+
}
806+
795807
// Trigger savepoint for a job then return savepoint status to update.
796808
func (reconciler *ClusterReconciler) takeSavepointAsync(jobID string, triggerReason string) (*v1beta1.SavepointStatus, error) {
797809
var log = reconciler.log
@@ -854,6 +866,14 @@ func (reconciler *ClusterReconciler) takeSavepoint(
854866
return err
855867
}
856868

869+
func (reconciler *ClusterReconciler) updateSavepointTriggerTimeStatus() error {
870+
var cluster = v1beta1.FlinkCluster{}
871+
reconciler.observed.cluster.DeepCopyInto(&cluster)
872+
var jobStatus = cluster.Status.Components.Job
873+
setTimestamp(&jobStatus.LastSavepointTriggerTime)
874+
return reconciler.k8sClient.Status().Update(reconciler.context, &cluster)
875+
}
876+
857877
func (reconciler *ClusterReconciler) updateSavepointStatus(
858878
savepointStatus flinkclient.SavepointStatus) error {
859879
var cluster = v1beta1.FlinkCluster{}

controllers/flinkcluster_util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const (
4242
ControlRetries = "retries"
4343
ControlMaxRetries = "3"
4444

45-
SavepointTimeoutSec = 60
45+
SavepointTimeoutSec = 900 // 15 mins
4646

4747
RevisionNameLabel = "flinkoperator.k8s.io/revision-name"
4848

controllers/flinkcluster_util_test.go

+35-30
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,10 @@ func TestShouldUpdateJob(t *testing.T) {
230230
cluster: &v1beta1.FlinkCluster{
231231
Status: v1beta1.FlinkClusterStatus{
232232
Components: v1beta1.FlinkClusterComponentsStatus{Job: &v1beta1.JobStatus{
233-
State: v1beta1.JobStateRunning,
234-
LastSavepointTime: tc.ToString(savepointTime),
235-
SavepointLocation: "gs://my-bucket/savepoint-123",
233+
State: v1beta1.JobStateRunning,
234+
LastSavepointTime: tc.ToString(savepointTime),
235+
LastSavepointTriggerTime: tc.ToString(savepointTime),
236+
SavepointLocation: "gs://my-bucket/savepoint-123",
236237
}},
237238
CurrentRevision: "1", NextRevision: "2",
238239
},
@@ -264,9 +265,10 @@ func TestShouldUpdateJob(t *testing.T) {
264265
cluster: &v1beta1.FlinkCluster{
265266
Status: v1beta1.FlinkClusterStatus{
266267
Components: v1beta1.FlinkClusterComponentsStatus{Job: &v1beta1.JobStatus{
267-
State: v1beta1.JobStateRunning,
268-
LastSavepointTime: tc.ToString(savepointTime),
269-
SavepointLocation: "gs://my-bucket/savepoint-123",
268+
State: v1beta1.JobStateRunning,
269+
LastSavepointTime: tc.ToString(savepointTime),
270+
LastSavepointTriggerTime: tc.ToString(savepointTime),
271+
SavepointLocation: "gs://my-bucket/savepoint-123",
270272
}},
271273
CurrentRevision: "1", NextRevision: "2",
272274
},
@@ -325,9 +327,10 @@ func TestIsSavepointUpToDate(t *testing.T) {
325327
var savepointTime = time.Now()
326328
var observeTime = savepointTime.Add(time.Second * 100)
327329
var jobStatus = v1beta1.JobStatus{
328-
State: v1beta1.JobStateFailed,
329-
LastSavepointTime: tc.ToString(savepointTime),
330-
SavepointLocation: "gs://my-bucket/savepoint-123",
330+
State: v1beta1.JobStateFailed,
331+
LastSavepointTime: tc.ToString(savepointTime),
332+
LastSavepointTriggerTime: tc.ToString(savepointTime),
333+
SavepointLocation: "gs://my-bucket/savepoint-123",
331334
}
332335
var update = isSavepointUpToDate(observeTime, jobStatus)
333336
assert.Equal(t, update, true)
@@ -336,9 +339,10 @@ func TestIsSavepointUpToDate(t *testing.T) {
336339
savepointTime = time.Now()
337340
observeTime = savepointTime.Add(time.Second * 500)
338341
jobStatus = v1beta1.JobStatus{
339-
State: v1beta1.JobStateFailed,
340-
LastSavepointTime: tc.ToString(savepointTime),
341-
SavepointLocation: "gs://my-bucket/savepoint-123",
342+
State: v1beta1.JobStateFailed,
343+
LastSavepointTime: tc.ToString(savepointTime),
344+
LastSavepointTriggerTime: tc.ToString(savepointTime),
345+
SavepointLocation: "gs://my-bucket/savepoint-123",
342346
}
343347
update = isSavepointUpToDate(observeTime, jobStatus)
344348
assert.Equal(t, update, false)
@@ -347,8 +351,9 @@ func TestIsSavepointUpToDate(t *testing.T) {
347351
savepointTime = time.Now()
348352
observeTime = savepointTime.Add(time.Second * 500)
349353
jobStatus = v1beta1.JobStatus{
350-
State: v1beta1.JobStateFailed,
351-
LastSavepointTime: tc.ToString(savepointTime),
354+
State: v1beta1.JobStateFailed,
355+
LastSavepointTime: tc.ToString(savepointTime),
356+
LastSavepointTriggerTime: tc.ToString(savepointTime),
352357
}
353358
update = isSavepointUpToDate(observeTime, jobStatus)
354359
assert.Equal(t, update, false)
@@ -408,8 +413,8 @@ func TestIsFlinkAPIReady(t *testing.T) {
408413
Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"},
409414
},
410415
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
411-
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
412-
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
416+
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
417+
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
413418
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
414419
flinkJobStatus: FlinkJobStatus{flinkJobList: &flinkclient.JobStatusList{}},
415420
}
@@ -425,10 +430,10 @@ func TestIsFlinkAPIReady(t *testing.T) {
425430
},
426431
Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"},
427432
},
428-
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
433+
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
429434
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
430435
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
431-
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
436+
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
432437
}
433438
ready = isFlinkAPIReady(observed)
434439
assert.Equal(t, ready, false)
@@ -442,9 +447,9 @@ func TestIsFlinkAPIReady(t *testing.T) {
442447
},
443448
Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"},
444449
},
445-
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
450+
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
446451
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
447-
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
452+
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
448453
}
449454
ready = isFlinkAPIReady(observed)
450455
assert.Equal(t, ready, false)
@@ -458,10 +463,10 @@ func TestIsFlinkAPIReady(t *testing.T) {
458463
},
459464
Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"},
460465
},
461-
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
466+
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
462467
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
463468
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
464-
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
469+
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
465470
}
466471
ready = isFlinkAPIReady(observed)
467472
assert.Equal(t, ready, false)
@@ -478,11 +483,11 @@ func TestGetUpdateState(t *testing.T) {
478483
Components: v1beta1.FlinkClusterComponentsStatus{Job: &v1beta1.JobStatus{State: v1beta1.JobStateRunning}},
479484
CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"},
480485
},
481-
job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
482-
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
486+
job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
487+
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
483488
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
484489
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
485-
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
490+
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
486491
}
487492
var state = getUpdateState(observed)
488493
assert.Equal(t, state, UpdateStatePreparing)
@@ -497,7 +502,7 @@ func TestGetUpdateState(t *testing.T) {
497502
},
498503
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
499504
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
500-
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
505+
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
501506
}
502507
state = getUpdateState(observed)
503508
assert.Equal(t, state, UpdateStateInProgress)
@@ -510,12 +515,12 @@ func TestGetUpdateState(t *testing.T) {
510515
},
511516
Status: v1beta1.FlinkClusterStatus{CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"},
512517
},
513-
job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
514-
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
518+
job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
519+
configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
515520
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
516521
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
517-
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
518-
jmIngress: &extensionsv1beta1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
522+
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
523+
jmIngress: &extensionsv1beta1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
519524
}
520525
state = getUpdateState(observed)
521526
assert.Equal(t, state, UpdateStateFinished)

docs/crd.md

+2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ FlinkCluster
6969
|__ args
7070
|__ fromSavepoint
7171
|__ allowNonRestoredState
72+
|__ takeSavepointOnUpgrade
7273
|__ autoSavepointSeconds
7374
|__ savepointsDir
7475
|__ savepointGeneration
@@ -262,6 +263,7 @@ FlinkCluster
262263
* **autoSavepointSeconds** (optional): Automatically take a savepoint to the `savepointsDir` every n seconds.
263264
* **savepointsDir** (optional): Savepoints dir where to store automatically taken savepoints.
264265
* **allowNonRestoredState** (optional): Allow non-restored state, default: false.
266+
* **takeSavepointOnUpgrade** (optional): Should take savepoint before upgrading the job, default: false.
265267
* **savepointGeneration** (optional): Update this field to `jobStatus.savepointGeneration + 1` for a running job
266268
cluster to trigger a new savepoint to `savepointsDir` on demand.
267269
* **parallelism** (optional): Parallelism of the job, default: 1.

helm-chart/flink-operator/Chart.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ apiVersion: v1
22
name: flink-operator
33
appVersion: "1.0"
44
description: A Helm chart for flink on Kubernetes operator
5-
version: "0.2.0"
5+
version: "0.2.1"
66
keywords:
77
- flink
88
home: https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

helm-chart/flink-operator/templates/flink-cluster-crd.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ spec:
164164
type: integer
165165
cancelRequested:
166166
type: boolean
167+
takeSavepointOnUpgrade:
168+
type: boolean
167169
className:
168170
type: string
169171
cleanupPolicy:
@@ -4976,6 +4978,8 @@ spec:
49764978
type: string
49774979
lastSavepointTime:
49784980
type: string
4981+
lastSavepointTriggerTime:
4982+
type: string
49794983
lastSavepointTriggerID:
49804984
type: string
49814985
name:

0 commit comments

Comments
 (0)