Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: add snapshot expiration reset strategy #12639

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Guosmilesmile
Copy link
Contributor

We encountered a scenario where, when using Flink source to incrementally consume data from Iceberg, the lastSnapshotId being consumed has already been cleaned up. This can happen, for example, through Spark's expire_snapshots (CALL iceberg.system.expire_snapshots(table => 'default.my_table', older_than => TIMESTAMP '2025-03-25 00:00:00.000', retain_last => 1)) or in other cases where consumption is too slow and historical snapshots are cleaned up.

Caused by: java.lang.IllegalArgumentException: Starting snapshot (exclusive) 2444106500863389603 is not a parent ancestor of end snapshot 2357271669960485754
  at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:430)
  at org.apache.iceberg.BaseIncrementalScan.fromSnapshotIdExclusive(BaseIncrementalScan.java:179)
  at org.apache.iceberg.BaseIncrementalScan.planFiles(BaseIncrementalScan.java:104)
  at org.apache.iceberg.BaseIncrementalAppendScan.planTasks(BaseIncrementalAppendScan.java:61)
  at org.apache.iceberg.flink.source.FlinkSplitPlanner.planTasks(FlinkSplitPlanner.java:119)
  at org.apache.iceberg.flink.source.FlinkSplitPlanner.planIcebergSourceSplits(FlinkSplitPlanner.java:76)
  at org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.discoverIncrementalSplits(ContinuousSplitPlannerImpl.java:135)
  at org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.planSplits(ContinuousSplitPlannerImpl.java:83)
  at org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator.discoverSplits(ContinuousIcebergEnumerator.java:130)
  at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:130)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

In such scenarios, the Flink job will repeatedly restart due to the snapshot ID stored in the state being unavailable, requiring manual intervention to restart the job for recovery. We hope to have a mechanism that allows the job to recover automatically when encountering this situation, similar to how Kafka handles out-of-range offsets by automatically starting to consume from the earliest or latest available data.

This PR mainly adds a configuration option called snapshot-expiration-reset-strategy. When the lastSnapshot is not the parent ancestor of the current snapshot, it can be handled in three ways to avoid manual intervention to restart the job for recovery :

Default mode: Maintain the current behavior.
Earliest mode: Start incremental consumption from the oldest snapshot as the lastSnapshot.
Latest mode: Start incremental consumption from the current latest snapshot as the lastSnapshot.

@Guosmilesmile
Copy link
Contributor Author

@pvary @stevenzwu Could you please take some time to review whether this change is feasible? Thanks!

@pvary
Copy link
Contributor

pvary commented Mar 25, 2025

It might be only me, but SnapshotExpirationResetStrategy.LATEST, SnapshotExpirationResetStrategy.EARLIEST is a very serious data corruption issue which should require manual intervention. The only valid use-case would be a CDC table, but for those we don't have a steaming read yet.

@stevenzwu, @mxm: What are your thoughts?

@Guosmilesmile
Copy link
Contributor Author

It might be only me, but SnapshotExpirationResetStrategy.LATEST, SnapshotExpirationResetStrategy.EARLIEST is a very serious data corruption issue which should require manual intervention. The only valid use-case would be a CDC table, but for those we don't have a steaming read yet.

@stevenzwu, @mxm: What are your thoughts?

@pvary Thank you very much for your response. Here is my somewhat immature little thought.

In our daily operations, if we encounter such a scenario and manually intervene, the only way to recover is by modifying the source configuration to set the starting-strategy. However, whether we choose to recover from INCREMENTAL_FROM_EARLIEST_SNAPSHOT or INCREMENTAL_FROM_LATEST_SNAPSHOT, it will inevitably lead to data loss. Recovering from TABLE_SCAN_THEN_INCREMENTAL would result in data duplication, which could also cause a significant traffic impact downstream. None of these options are ideal, as each can introduce certain data issues.

Moreover, manual intervention may not always be timely, potentially leading to even more data loss. This is why we came up with the idea of needing such an automated recovery configuration, while also retaining the default option to support the default behavior.

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Mar 25, 2025

To add further, manual intervention to recover the job requires abandoning the previous state. If downstream jobs utilize windows, this would also result in the loss of statistical data. To ensure the preservation of state data, we also hope to have an automated recovery mechanism in place.

This PR mainly to avoid manual intervention to restart the job for recovery

@pvary
Copy link
Contributor

pvary commented Mar 25, 2025

@Guosmilesmile: I hope my answer were not too harsh, I did not intend to hurt. All I was trying to say is that in my experience the correctness is the most important feature for a job. YMMW

manual intervention to recover the job requires abandoning the previous state.

Could you use uid to drop the source state, and keep the state of the other operators?

@Guosmilesmile
Copy link
Contributor Author

@Guosmilesmile: I hope my answer were not too harsh, I did not intend to hurt. All I was trying to say is that in my experience the correctness is the most important feature for a job. YMMW

manual intervention to recover the job requires abandoning the previous state.

Could you use uid to drop the source state, and keep the state of the other operators?

Peter, your feedback is very insightful and timely, and I am very glad to receive your response. I would love to communicate with you more; I just worry that my replies may not fully express my thoughts, so I would like to provide additional information.

Modifying the UID can resolve my scenario, but it requires manual intervention each time, especially in the middle of the night, which is why I came up with this PR. I apologize if my previous replies have caused any misunderstandings.

@pvary
Copy link
Contributor

pvary commented Mar 25, 2025

Good to hear!
Let's hear what others think about the possibilities. If all else fails, setting the uid could be done through environmental variables, or job properties, and that could solve your immediate problem.

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Mar 26, 2025

Yes, you are right. Change uid can help me to save the state.

I think this PR is primarily aimed at providing an automatic recovery mechanism to avoid the need for manual intervention, while the configuration switch can also preserve the original behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants