Skip to content

Conversation

@smjn
Copy link
Collaborator

@smjn smjn commented Nov 4, 2025

  • When a ShareFetch request contains RENEW acks, the fetchMessages
    sub-routine is skipped in the KafkaApis handler for the request.
  • Additionally, new validations for ShareFetch version and ackType have
    been added along with validations on fields maxBytes, minBytes,
    maxRecords and maxWaitMs which should be set to 0 for version >= 2 and
    isRenewAck set to true.
  • Unit tests have been added to verify the behavior.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels Nov 4, 2025
@smjn smjn added ci-approved and removed triage PRs from the community labels Nov 4, 2025
@AndrewJSchofield AndrewJSchofield added the KIP-932 Queues for Kafka label Nov 4, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. There should also be a check that if isRenewAck is set, then the fields that the KIP says must be zero, must actually be zero.

erroneousTopicIdPartitions.add(tp)
isErroneous = true
} else if (batch.acknowledgeTypes.stream().anyMatch(ackType => ackType < 0 || ackType > 3)) {
} else if (batch.acknowledgeTypes.stream().anyMatch(ackType => ackType < 0 || ackType > 4)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would add a pair of parameters, one which indicates whether the request is permitted to use RENEW (so version >= 2), and another to indicate whether the request actually has isRenewAck set. If the request version is >= 2, then the max ack type is 4, else 3. If isRenewAck is not set, the ackType must not be 4.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, some comments.

Comment on lines +3242 to +3246
val fetchResult: CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] =
if (shareFetchRequest.version() >= 2 && shareFetchRequest.data.isRenewAck)
CompletableFuture.completedFuture(mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData])
else
handleFetchFromShareFetchRequest(request, shareSessionEpoch, erroneousAndValidPartitionData, sharePartitionManager, authorizedTopics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please write comments for the changes.

Comment on lines 6599 to 6613
when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(util.Map.of[TopicIdPartition, ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)),
new ShareFetchResponseData.PartitionData()
.setErrorCode(Errors.NONE.code)
.setAcknowledgeErrorCode(Errors.NONE.code)
.setRecords(records1)
.setAcquiredRecords(new util.ArrayList(util.List.of(
new ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(0)
.setLastOffset(9)
.setDeliveryCount(1)
)))
))
).thenReturn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need any mocks for sharePartitionManager.fetchMessage if we just want to test that no fetch happens when renew ack is present? We can simply check the mocked sharePartitionManager that no calls should happen for fetchMessages, correct?

Copy link
Collaborator Author

@smjn smjn Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was planning to add a not-called assertion but it isn't needed. There should be one mock only. The test simulates one sharefetch to get records and then call renew on one of those records.

@smjn
Copy link
Collaborator Author

smjn commented Nov 5, 2025

@AndrewJSchofield @apoorvmittal10 Thanks for the review, incorporated comments.

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall source code looks good. I would recommend adding some tests for share acknowledge RPC handling along with RENEW ack. For some tests, we should use multiple Acknowledgement types in acknowledgement batches just to make sure everything is working right.

new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
)

when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this mock. We don't need it.

}

@Test
def testHandleShareFetchRequestSuccessWithRenewAcknowledgements(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should enhance this test case by using both RENEW acks and the previous present acknowledgements just to make sure things are working fine

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not make much difference as we are mocking the meat of the method - sharePartitionManager.fetchMessages. Will add nevertheless.

val response = verifyNoThrottling[ShareFetchResponse](request)
val responseData = response.data()

assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we should also add asserts over the error message, since you have custom messages for different conditions

Copy link
Collaborator Author

@smjn smjn Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent
I was planning to do this initially but

There is a shortcoming in Errors class where the message contained in e is swallowed (Errors.forException(e)) in ShareFetchRequest.getErrorResponse. Error responses from other requests have the same issue as well where only the error code is set.
ErrorCode is way to go. In general it might not be best practise to assert on specific messages as they might change often. Error codes should be verified.

@smjn smjn requested a review from adixitconfluent November 5, 2025 10:54
Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor comment


val groupId = "group"

when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this mock. We should get rid of it.

@smjn smjn requested a review from adixitconfluent November 5, 2025 11:07
Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants