Skip to content

KAFKA-19019: Add support for remote storage fetch for share groups #19437

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Apr 10, 2025

What

This PR adds the support for remote storage fetch for share groups.

Limitation

There is a limitation in remote storage fetch for consumer groups that
we can only perform remote fetch for a single topic partition in a fetch
request. Since, the logic of share fetch requests is largely based on
how consumer
groups work, we are following similar logic in implementing remote
storage fetch. However, this problem
should be addressed as part of KAFKA-19133 which should help us perform
fetch for multiple remote fetch topic partition in a single share fetch
request.

Testing

I have followed the AK
documentation

to test my code locally (by adopting LocalTieredStorage.java) and with
the help of unit tests.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Apr 10, 2025
@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels Apr 10, 2025
@adixitconfluent adixitconfluent marked this pull request as ready for review April 12, 2025 13:19
@adixitconfluent adixitconfluent changed the title KAFKA-19019: Add remote storage fetch for share groups KAFKA-19019: Add support for remote storage fetch for share groups Apr 12, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the PR. Made a pass of the non-testing file. Left a few comments.

@adixitconfluent adixitconfluent requested a review from junrao April 15, 2025 09:03
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. A few more comments.

// topic partitions for which fetch would not be happening in this share fetch request.
if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
// Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
releasePartitionLocks(Set.of(topicIdPartition));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After releasing the locks here, it's possible for another client to make progress. Should we trigger a check on the purgatory?

LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
) {
Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap = Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to rename remoteStorageFetchMetadataMap since it's no longer a map.

// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
// fetch for multiple remote fetch topic partition in a single share fetch request
remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we include logReadResult? Then we don't need to pass in replicaManagerReadResponse to maybeProcessRemoteFetch().

Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse);
if (exceptionOpt.isPresent()) {
remoteStorageFetchException = exceptionOpt;
throw exceptionOpt.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get a remoteStorageFetchException, we shouldn't throw the exception to the caller. Instead, we should just force complete the request with the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not throwing exception to the caller. We are force completing the request with the error. When we throw an error in maybeProcessRemoteFetch, we handle it in the catch in tryComplete. This catch calls forceComplete. Once we enter onComplete, it goes to this line and calls completeErroneousRemoteShareFetchRequest. The function completeErroneousRemoteShareFetchRequest completes the share fetch request with exception.

} catch (Exception e) {
return Optional.of(e);
}
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit inconsistent to set the non error result to an instance val, but return the error result to the caller. It will be clearer if we handle both the error and non error result in the same way.

Copy link
Contributor Author

@adixitconfluent adixitconfluent Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are handling the errors in the same way as non errors i.e. via instance variables remoteStorageFetchException and remoteFetchOpt. The flow been explained in the above comment for the error scenario. For the non-error scenario, we populate remoteFetchOpt and once the request can be completed, we go to onComplete, check the value of remoteFetchOpt and complete the remote share fetch request.
For both the cases the share fetch request completes from onCompleteusing instance variable values.

if (remoteStorageFetchException.isPresent()) {
          completeErroneousRemoteShareFetchRequest();
} else if (remoteFetchOpt.isPresent()) {
           completeRemoteStorageShareFetchRequest();
} else {
           completeLocalLogShareFetchRequest();
}


/**
* This function completes a share fetch request for which we have identified remoteFetch during tryComplete()
* Note - This function should only be called when we know that there is remote fetch in-flight/completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not very accurate since this method could be called when the request expires.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comment to Note - This function should only be called when we know that there is remote fetch in-flight/completed/expired.

nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition);
}
});
nonRemoteFetchTopicPartitionData = acquirablePartitions(nonRemoteFetchSharePartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonRemoteFetchTopicPartitionData => acquiredNnonRemoteFetchTopicPartition?

shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler);
shareFetch.maybeComplete(remoteFetchResponse);
log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse);
} catch (RuntimeException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? All Kafka exceptions are RuntimeException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think putting RuntimeException isn't a good idea. I have replaced the catch blocks with

catch (InterruptedException | ExecutionException e) {
            log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e);
            ...
        } catch (Exception e) {
            log.error("Unexpected error in processing delayed share fetch request", e);
            ...
        }

/**
* Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is
* already running as it may force closing opened/cached resources as transaction index.
* Note - This function should only be called when we know that there is a remote fetch in-flight/completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not very accurate since this method could be called when the request expires.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comment to Note - This function should only be called when we know that there is a remote fetch in-flight/expired.

@adixitconfluent adixitconfluent requested a review from junrao April 16, 2025 07:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants