KAFKA-19434: Startup state stores initialization#20749
KAFKA-19434: Startup state stores initialization#20749bbejeck merged 27 commits intoapache:trunkfrom
Conversation
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
Outdated
Show resolved
Hide resolved
Nikita-Shupletsov
left a comment
There was a problem hiding this comment.
Thanks a lot for putting it together!
A much better idea than what I had in my PR
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
Outdated
Show resolved
Hide resolved
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @eduwercamacaro! I made an initial pass
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
Outdated
Show resolved
Hide resolved
| this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId)); | ||
|
|
||
| log.debug("Created state store manager for task {}", taskId); | ||
| this.startupState = new AtomicBoolean(startupState); |
There was a problem hiding this comment.
same would apply to the AtomicBoolean variable
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Outdated
Show resolved
Hide resolved
bbejeck
left a comment
There was a problem hiding this comment.
@eduwercamacaro I've made another pass and confirmed the fix by running a streams application and stopping/starting several times with existing state. Overall this lgtm
Can you rebase this PR?
Also I think we could use a new test in KafkaStreamsTelemetryIntegrationTest that start/stops/restarts a single instance and confirms the state metrics getting emitted.
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
Outdated
Show resolved
Hide resolved
|
@bbejeck Thanks for your time reviewing this. I already rebased this branch. I believe this integration test is already covering that case. what do you think? |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks again for the PR @eduwercamacaro!
I have one additional question I missed from the last review.
Also can you remove the ProcessorStateManager.assignToStreamThread method? it's not used anymore.
The ProcessorStateManagerTest could use better coverage of registerStartupStateStores and initializeStoreOffsetsFromCheckpoint
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
Outdated
Show resolved
Hide resolved
|
@eduwercamacaro - you can ignore my comments on testing - we can do as a follow-up PR |
|
@eduwercamacaro the failures are related - I think addressing this comment should fix the integration tests. |
…ing startup store pre-initialization
This reverts commit b4cb0ff.
3cacb12 to
4451626
Compare
4451626 to
6078803
Compare
|
@bbejeck I modified this PR according to our last discussion about the solution for KAFKA-19434 We basically open the stores found in the state directory during initialization and then close them. cc: @mjsax |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR, @eduwercamacaro! Overall, it looks good with few comments.
Also, ProcessorStateManager.assignToStreamThread on line 237 is unused; can we get rid of it?
| } finally { | ||
| // Make sure the state manager writes the local checkpoint file before closing the stores | ||
| // This will be replaced in the future when removing the checkpoint file dependency. | ||
| temporaryStateManager.checkpoint(); |
There was a problem hiding this comment.
Same as above temporaryStateManager.checkpoint() can throw a ProcessorStateException but we don't handle it here, meaning close() is not called. So do we consider this a fatal exception and allow it to bubble up and kill the main thread? If not, I'm guessing we'll need to catch this exception and do something. Either way let's add a couple of comments on the handling
There was a problem hiding this comment.
I moved this logic inside the try/catch block to ensure that we always call the close() method. Any exception from the close() method is considered fatal and will break the main thread.
| task.closeDirty(); | ||
| StateManagerUtil.registerStateStores(log, threadLogPrefix, subTopology, temporaryStateManager, this, initContext); | ||
| } catch (final TaskCorruptedException tce) { | ||
| log.warn("Failed to register startup state stores for task {}: {}", id, tce.getMessage()); |
There was a problem hiding this comment.
registerStateStores also can throw a StreamsException, LockException , or a ProcessorStateException should we handle these exceptions as well? If not add some comments on the reasoning
There was a problem hiding this comment.
Good point.
I added some comments and JavaDoc for this method.
Those exceptions are considered fatal and will break the StreamThread. We only handle TaskCorruptedException by logging and continuing with the initialization as this exception will be handled by the StreamThread later in the first assignment.
| // Make sure the state manager writes the local checkpoint file before closing the stores | ||
| // This will be replaced in the future when removing the checkpoint file dependency. | ||
| temporaryStateManager.checkpoint(); | ||
| temporaryStateManager.close(); |
There was a problem hiding this comment.
I think we also need to add StateDirectory.unlock here too.
There was a problem hiding this comment.
Currently, it is not needed because we keep the lock on the main thread until it is assigned to a SthreadThread. TaskManager changes the ownership of the Task using 'StateDirectory.removeStartupState'.
| temporaryStateManager.checkpoint(); | ||
| temporaryStateManager.close(); | ||
| } | ||
| tasksForLocalState.put(id, new StartupState(id, subTopology, temporaryStateManager)); |
There was a problem hiding this comment.
temporaryStateManager is closed above line 264 but it's passed here to StartupState.
It's never accessed from StartupState . With the new work is StartupState required or can we get away with Set<TaskId> ? Or is it going to be needed for subsequent work?
There was a problem hiding this comment.
Yep. Excellent observation.
You are right, we can use Set<TaskId>`` instead. I completely removed the StartupState` class and the process to close the stores because we are immediately closing them.
- track startup task ids via a simple Set and unlock them on close - remove startup-task handoff/close APIs and related tests - drop unused standby-thread assignment helper in
eduwercamacaro
left a comment
There was a problem hiding this comment.
@bbejeck Thanks for your comments
I pushed a new commit with the suggestions.
| // Make sure the state manager writes the local checkpoint file before closing the stores | ||
| // This will be replaced in the future when removing the checkpoint file dependency. | ||
| temporaryStateManager.checkpoint(); | ||
| temporaryStateManager.close(); |
There was a problem hiding this comment.
Currently, it is not needed because we keep the lock on the main thread until it is assigned to a SthreadThread. TaskManager changes the ownership of the Task using 'StateDirectory.removeStartupState'.
| task.closeDirty(); | ||
| StateManagerUtil.registerStateStores(log, threadLogPrefix, subTopology, temporaryStateManager, this, initContext); | ||
| } catch (final TaskCorruptedException tce) { | ||
| log.warn("Failed to register startup state stores for task {}: {}", id, tce.getMessage()); |
There was a problem hiding this comment.
Good point.
I added some comments and JavaDoc for this method.
Those exceptions are considered fatal and will break the StreamThread. We only handle TaskCorruptedException by logging and continuing with the initialization as this exception will be handled by the StreamThread later in the first assignment.
| temporaryStateManager.checkpoint(); | ||
| temporaryStateManager.close(); | ||
| } | ||
| tasksForLocalState.put(id, new StartupState(id, subTopology, temporaryStateManager)); |
There was a problem hiding this comment.
Yep. Excellent observation.
You are right, we can use Set<TaskId>`` instead. I completely removed the StartupState` class and the process to close the stores because we are immediately closing them.
| } finally { | ||
| // Make sure the state manager writes the local checkpoint file before closing the stores | ||
| // This will be replaced in the future when removing the checkpoint file dependency. | ||
| temporaryStateManager.checkpoint(); |
There was a problem hiding this comment.
I moved this logic inside the try/catch block to ensure that we always call the close() method. Any exception from the close() method is considered fatal and will break the main thread.
- Remove startup state on statedir auto cleanup
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @eduwercamacaro! LGTM just waiting for the build to complete
|
Merged #20749 into trunk |
|
looks like build is failing after this commit |
Thanks @omkreddy! I've pushed a fix we should be good now |
Instead of creating Standby tasks from the state directory, we open the
local stores that exists in the state directory.
This resolves the issue raised in #KAFKA-19434
, where
store-specific metrics were being duplicated due to tasks being created
in the main thread and then assigned to a StreamThread.
Additionally, since we can now read the offsets from the store during
instance initialization, this clears the way for the implementation of
KIP-1035. As of now, the stores are loading the offsets from the
checkpoint file, but in a later PR, we will read these offsets from the
state store itself.
This PR modifies the behavior of Kafka Streams when initializing. Now
for each pre-existing store on the state directory: We open the store,
read offsets from the checkpoint file and then close it again. The
reason why we open the store is because the store will be responsible
for tracking the offsets and we will deprecate the checkpoint file.
Reviewers: Nikiita Shuplestovnshupletsov@confluent.io, Bill
Bejeckbbejeck@apache.org