Skip to content

Conversation

@aliehsaeedii
Copy link
Contributor

@aliehsaeedii aliehsaeedii commented Apr 11, 2025

Kafka Streams calls prepareCommit() in Taskmanager#closeTaskDirty().
However, the dirty task must not get committed and therefore,
prepare-commit tasks such as getting offsets should not be needed as
well. The only thing needed before closing a task dirty is flushing.
Therefore, separating flush and prepareCommit could be a good fix.

Reviewers: Bill Bejeck [email protected], Matthias J. Sax
[email protected]

@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Apr 11, 2025
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @aliehsaeedii - this seems and like a good change overall looks good - I just have one minor comment. I'll defer final say to @mjsax since he recently made changes in this area.

// we call this function only to flush the case if necessary
// before suspending and closing the topology
task.prepareCommit();
task.flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SteamTask.prepareCommit() set a boolean flag hasPending TxCommit if using EOS and the commitNeeded flag is true - is it OK to bypass that? I'm thinking so but I'd like to confirm.

@mjsax
Copy link
Member

mjsax commented Apr 12, 2025

Thanks for the PR -- I am wondering if it would be better to actually change prepareCommit() to prepare(final boolean clean) (instead of making flush() part of the task interface), and skip the call to committableOffsetsAndMetadata() if clean == false and return null;?

As we don't want to commit, we should not even return an empty Map as guard rail, and the calling code as a matter of fact ignored the return value anyway (as it should).

Btw: there is multiple places in with we "close dirty", to changing from prepareCommit() to flush() at this place won't be sufficient. -- Adding a boolean flag, also has the advantage that we won't miss any caller, as we need to update all callers.

We also don't want to lose the guard to check the task state, which we do inside prepareCommit() but not inside flush().

@github-actions github-actions bot removed the triage PRs from the community label Apr 12, 2025
@github-actions github-actions bot removed the small Small PRs label Apr 14, 2025
@aliehsaeedii
Copy link
Contributor Author

Thanks, @bbejeck and @mjsax.

I think regarding keeping the guard to check the task state, what Matthias suggested makes more sense. So I implemented his approach.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss it, or is there no new test for this case? Would be good to at least add one more test to StreamsTaskTest; something like shouldNotGetOffsetsIfPerpareCommitDirty() verifying that we return null, and ideally also that committableOffsetsAndMetadata() was not called (the second check is more tricky -- need some creativity I guess).

@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
if (!clean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this to to the end of the method? So we preserve all logging?

Or maybe even don't check clean flag at all, because StandbyTasks should also never return offsets, and we could just change return Collections.emptyMap(); to return null; directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I check the clean flag for accurate logs.
Let's keep the Collections.emptyMap() when close clean to keep the logic as it is?!

@aliehsaeedii
Copy link
Contributor Author

Would be good to at least add one more test to StreamsTaskTest; something like shouldNotGetOffsetsIfPerpareCommitDirty() verifying that we return null, and ideally also that committableOffsetsAndMetadata() was not called (the second check is more tricky -- need some creativity I guess).

I think the 2nd check is not necessary since if committableOffsetsAndMetadata() is called, then prepareCommit(bool) would return a map and not NULL.

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mjsax mjsax merged commit ee4debb into apache:trunk Apr 25, 2025
24 checks passed
@mjsax
Copy link
Member

mjsax commented Apr 25, 2025

Thanks for the fix! Merged to trunk and cherry-picked to 4.0 branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants