Skip to content

Moved from SourceTask#commit() to SourceTask#commitRecord() #76

Open
limitium wants to merge 1 commit intoSolaceProducts:masterfrom
limitium:ack_per_record_commit
Open

Moved from SourceTask#commit() to SourceTask#commitRecord() #76
limitium wants to merge 1 commit intoSolaceProducts:masterfrom
limitium:ack_per_record_commit

Conversation

@limitium
Copy link

@limitium limitium commented Nov 6, 2024

Solution that fixes #75

Copy link
Collaborator

@Nephery Nephery left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thank you for your contribution.

Instead of using BytesXMLMessage.getAckMessageId(), it would be better to use BytesXMLMessage.getReplicationGroupMessageId() instead.

Also, could you write a test for this scenario?

…ate over acknowledgments and prevent message loss
@limitium limitium force-pushed the ack_per_record_commit branch from 26363bf to 4a0d367 Compare November 7, 2024 12:10
@limitium
Copy link
Author

Hey, thanks for the comment.
There're couple of changes for now

  1. No reason to use any IDs since message is still in heap, so removed auxiliary map and simplified that part
  2. Noticed huge performance degradation in 3 orders of magnitude, because of internal solace client buffer for acks and another pattern how #commitRecrod() is called. Have to implement similar buffer at task level in terms of capacity and flushing. Now performance at the same level.

Haven't find how to cover this with test. However here is a link for how kafka-connect calls commits

committableRecords.forEach(this::commitTaskRecord);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Messages loss on restarts

2 participants