Skip to content

[Bug] Seatunnel +Flink Engine: recent versions of seatunnel do NOT support to restore from flink (versions: 1.16 1.17 1.18) checkpoints/savepoints #10193

@tomma-a

Description

@tomma-a

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

From my test :seatunnel 2.3.12 runs on flink engine 1.17 or 1.18 with flink upgradeMode: savepoint or last-state, when upgrading the flinkdeployment ( with flink k8s operator), I encounter following error:

It seems like in seatunnel FlinkSourceSplitEnumeratorContext , we are using reflection to restore the seatunnel job , which is not yet compatiable with new versions of flink (just my guess)

2025-12-10 10:44:00,103 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 2025-12-10 10:44:00,104 INFO org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 2025-12-10 10:44:00,221 WARN org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext [] - Get flink job id failed java.lang.IllegalStateException: Initialize flink job-id failed at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.(FlinkSourceSplitEnumeratorContext.java:57) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155) ~[flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) ~[?:?]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.18.0.jar:1.18.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" is null at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12]

SeaTunnel Version

seatunnel 2.3.12

SeaTunnel Config

here is my above test settings:

seatunnel job setting, it's a streaming job

`
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval=60000
flink.execution.checkpointing.mode = "EXACTLY_ONCE"
flink.execution.checkpointing.timeout = 600000
}

source {
Kafka {
plugin_output="fake2"
topic = info
consumer.group="testr"
bootstrap.servers = "tom-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
format = json
}
}

sink {
Kafka {
plugin_input="fake2"
topic = topc
bootstrap.servers = "tom-cluster1-kafka-bootstrap.kafka.svc.cluster.local:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
}
}

`

Then I run the seatunnel job in flink (by flink k8s operator) CDR FlinkDeployment

`
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: seatunnel-flink-streaming-example-2
namespace: kafka
spec:
......

volumes:
- name: seatunnel-config
configMap:
name: seatunnel-config
job:
jarURI: local:///opt/seatunnel/starter/seatunnel-flink-15-starter.jar
entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink
args: ["--config", "/data/seatunnel.streaming.conf"]
parallelism: 2
upgradeMode: savepoint

`

The first time , i kubectl apply the above yaml into a k8s cluster, the the seatunnel job is running as normal. the flink checkpoints saved periodically successfully.

Then i make some changes the above yaml file , then apply the yaml again in k8s. It's a kind of uprading mode
Because my flink upgradeMode is savepoint (if last-state also doesn't work which uses the last checkpoint)
The above error happen, can NOT restore from last checkpoint/or savepoint!

Please correct me if i'm wrong about this, thanks

Running Command

by applying a k8s flinkdeployemnt CDR

Error Exception

2025-12-10 10:44:00,103 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master state to restore 2025-12-10 10:44:00,104 INFO  org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 2025-12-10 10:44:00,221 WARN  org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext [] - Get flink job id failed java.lang.IllegalStateException: Initialize flink job-id failed at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.<init>(FlinkSourceSplitEnumeratorContext.java:57) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155) ~[flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) ~[?:?]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.18.0.jar:1.18.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" is null at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12]

Zeta or Flink or Spark Version

flink: 1.16 , 1.17 ,1.18

I don't test other versions of flink.

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions