Skip to content

Commit 0e65a5a

Browse files
committed
[FLINK-37530] Record upgrade savepoint correctly in savepointInfo as well
1 parent 1578a29 commit 0e65a5a

File tree

4 files changed

+35
-7
lines changed

4 files changed

+35
-7
lines changed

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
3232
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
3333
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
34+
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
3435
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
3536
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
3637
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
@@ -398,21 +399,35 @@ protected void restoreJob(
398399
*
399400
* @param ctx context
400401
* @param savepointLocation location of savepoint taken
402+
* @param cancelTs Timestamp when upgrade/cancel was triggered
401403
*/
402-
protected void setUpgradeSavepointPath(FlinkResourceContext<?> ctx, String savepointLocation) {
404+
protected void setUpgradeSavepointPath(
405+
FlinkResourceContext<?> ctx, String savepointLocation, Instant cancelTs) {
403406
var conf = ctx.getObserveConfig();
404407
var savepointFormatType =
405-
ctx.getObserveConfig()
406-
.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
408+
SavepointFormatType.valueOf(
409+
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)
410+
.name());
407411

408412
FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
409413
conf,
410414
ctx.getOperatorConfig(),
411415
ctx.getKubernetesClient(),
412416
ctx.getResource(),
413-
SavepointFormatType.valueOf(savepointFormatType.name()),
417+
savepointFormatType,
414418
savepointLocation);
415-
ctx.getResource().getStatus().getJobStatus().setUpgradeSavepointPath(savepointLocation);
419+
var jobStatus = ctx.getResource().getStatus().getJobStatus();
420+
jobStatus.setUpgradeSavepointPath(savepointLocation);
421+
422+
// Register created savepoint in the now deprecated savepoint info and history
423+
var savepoint =
424+
new Savepoint(
425+
cancelTs.toEpochMilli(),
426+
savepointLocation,
427+
SnapshotTriggerType.UPGRADE,
428+
savepointFormatType,
429+
null);
430+
jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
416431
}
417432

418433
/**

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,12 @@ private void setJobIdIfNecessary(
223223
@Override
224224
protected boolean cancelJob(FlinkResourceContext<FlinkDeployment> ctx, SuspendMode suspendMode)
225225
throws Exception {
226+
var cancelTs = Instant.now();
226227
var result =
227228
ctx.getFlinkService()
228229
.cancelJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
229-
result.getSavepointPath().ifPresent(location -> setUpgradeSavepointPath(ctx, location));
230+
result.getSavepointPath()
231+
.ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs));
230232
return result.isPending();
231233
}
232234

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44+
import java.time.Instant;
4445
import java.util.Optional;
4546

4647
/** The reconciler for the {@link FlinkSessionJob}. */
@@ -100,10 +101,12 @@ public void deploy(
100101
@Override
101102
protected boolean cancelJob(FlinkResourceContext<FlinkSessionJob> ctx, SuspendMode suspendMode)
102103
throws Exception {
104+
var cancelTs = Instant.now();
103105
var result =
104106
ctx.getFlinkService()
105107
.cancelSessionJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
106-
result.getSavepointPath().ifPresent(location -> setUpgradeSavepointPath(ctx, location));
108+
result.getSavepointPath()
109+
.ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs));
107110
return result.isPending();
108111
}
109112

Diff for: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import java.time.Duration;
9999
import java.time.Instant;
100100
import java.time.ZoneId;
101+
import java.util.LinkedList;
101102
import java.util.List;
102103
import java.util.Map;
103104
import java.util.Optional;
@@ -303,6 +304,13 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
303304

304305
assertEquals(0, flinkService.getRunningCount());
305306

307+
var spInfo = statefulUpgrade.getStatus().getJobStatus().getSavepointInfo();
308+
assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
309+
assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType());
310+
assertEquals(
311+
spInfo.getLastSavepoint(),
312+
new LinkedList<>(spInfo.getSavepointHistory()).getLast());
313+
306314
reconciler.reconcile(statefulUpgrade, context);
307315

308316
runningJobs = flinkService.listJobs();

0 commit comments

Comments
 (0)