KAFKA-19712: ProcessorStateManager delegates offset tracking to stores#21738
KAFKA-19712: ProcessorStateManager delegates offset tracking to stores#21738nicktelford wants to merge 1 commit intoapache:trunkfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Disclosure: the changes here were all hand-written and verified by me, but I used Claude Code to help me break up a large set of changes into multiple PRs, and to analyse the changes for issues that had not been caught by tests. |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @nicktelford - overall LGTM with a couple of small issues to address before we merge
| private boolean taskDirIsEmpty(final File taskDir) { | ||
| final File[] storeDirs = taskDir.listFiles(pathname -> | ||
| !pathname.getName().equals(CHECKPOINT_FILE_NAME)); | ||
| !pathname.getName().equals(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME)); |
There was a problem hiding this comment.
I think this needs to be updated as it looks like the filter is going miss the new checkpoint file names of checkpoint_<store name>
| } | ||
|
|
||
| @Test | ||
| public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException { |
There was a problem hiding this comment.
I'm wondering if we should keep this test for now, won't we have users running in the pre-txn statestore mode for a while?
| for (final StateStoreMetadata store : stores.values()) { | ||
| if (store.corrupted) { | ||
| log.error("Tried to initialize store offsets for corrupted store {}", store); | ||
| throw new ProcessorStateException( |
There was a problem hiding this comment.
This is correct, but is there a chance this could lead to a behavior change since the previous code only threw IllegalStateException meaning this might land in a catch block it didn't before?
| logPrefix), e); | ||
| } | ||
|
|
||
| stateDirectory.updateTaskOffsets(taskId, changelogOffsets()); |
There was a problem hiding this comment.
This used to be withing the try/catch block but now if there's an error it will bubble up and possibly escape handling since it's no longer going throw either TaskCorruptedException or ProcessorStateException
As part of KIP-1035, we want to transition away from task-specific
.checkpointfiles, and instead delegate offset management toStateStores.We now have a
LegacyCheckpointingStateStorewrapper to encapsulate themanagement of offsets for
StateStoreimplementations that do not knowhow to manage their own offsets (i.e. for which
managesOffsets() == false).As of KAFKA-20212,
RocksDBStorenow knows how to manage its ownoffsets, so it will not be wrapped in a
LegacyCheckpointingStateStore;only user-defined persistent stores will use this wrapper.
Corresponding changes to
GlobalStateManagerImplwill be submittedindependently, as KAFKA-20257.
Until both
ProcessorStateManagerandGlobalStateManagerImplhavebeen updated, the
StateManagerinterface must remain as-is. Therefore,the
flushandcheckpointmethods will not be consolidated until alater PR, which will clean up the interface and its usage by
Taskandfriends.