Skip to content

[SPARK-51573][SS] Fix Streaming State Checkpoint v2 checkpointInfo race condition #50344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

liviazhu
Copy link
Contributor

What changes were proposed in this pull request?

Return StateStoreCheckpointInfo as part of RocksDB.commit() and store it locally in the RocksDBStateStore so that RocksDBStateStore.getCheckpointInfo() always returns the checkpoint info belonging to its commit.

Why are the changes needed?

Fixes the bug explained in SPARK-51573. This race condition will result in tasks getting incorrect checkpointInfo which is a correctness bug.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

@liviazhu liviazhu changed the title [SPARK-51573] Fix Streaming State Checkpoint v2 checkpointInfo race condition [SS][SPARK-51573] Fix Streaming State Checkpoint v2 checkpointInfo race condition Mar 21, 2025
@HyukjinKwon HyukjinKwon changed the title [SS][SPARK-51573] Fix Streaming State Checkpoint v2 checkpointInfo race condition [SPARK-51573][SS] Fix Streaming State Checkpoint v2 checkpointInfo race condition Mar 24, 2025
Copy link
Contributor

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you for the fix! I like the way we fix it and have no concern on merging it.
I hope @HeartSaVioR or other committers likes it too.

val conf: RocksDBConf,
stateStoreId: StateStoreId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this refactoring necessary or related at all?
I think we should separate refactoring changes to the bug fix itself. It will make easier to blame issues in case something happens, and make it easier to revert a change if possible.

Regarding the factoring itself, I feel like the previous approach better. The more higher layer information (in this case stateStore ID) we pass into the lower layer, the less likely we come up with a strong abstraction. It also has the side effect that the unit test will be harder to write for example. For example, to test class RocksDB, we now need to fake a whole StateStoreId, rather than passing two strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I undid the refactoring and pass in partitionId as an argument to the RocksDB constructor instead.

Copy link
Contributor

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the change!

@@ -73,7 +72,8 @@ class RocksDB(
hadoopConf: Configuration = new Configuration,
loggingId: String = "",
useColumnFamilies: Boolean = false,
enableStateStoreCheckpointIds: Boolean = false) extends Logging {
enableStateStoreCheckpointIds: Boolean = false,
partitionId: Int = 0) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I didn't realized we need to pass one more thing there. I see some points on the refactoring. But I think it will be nice to separate refactoring from bug fix.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val store1NewVersion = store1.commit()
val store2 = provider.getStore(1)
val store2NewVersion = store2.commit()
val store1CheckpointInfo = store1.getStateStoreCheckpointInfo()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So without the fix, store1CheckpointInfo and store2CheckpointInfo are the same which is not expected. Looks OK to me.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master/4.0 (since this could lead to correctness bug).

HeartSaVioR pushed a commit that referenced this pull request Mar 27, 2025
…ce condition

Return StateStoreCheckpointInfo as part of RocksDB.commit() and store it locally in the RocksDBStateStore so that RocksDBStateStore.getCheckpointInfo() always returns the checkpoint info belonging to its commit.

Fixes the bug explained in SPARK-51573. This race condition will result in tasks getting incorrect checkpointInfo which is a correctness bug.

No.

Added new unit test.

No.

Closes #50344 from liviazhu-db/liviazhu-db/checkpointinfo-race.

Authored-by: Livia Zhu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 4765c15)
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants