Skip to content

Commit 2dc8ee0

Browse files
Addressed Jun's round 1 review comments - part 2
1 parent 5ae1203 commit 2dc8ee0

File tree

2 files changed

+56
-71
lines changed

2 files changed

+56
-71
lines changed

Diff for: core/src/main/java/kafka/server/share/DelayedShareFetch.java

+52-69
Original file line numberDiff line numberDiff line change
@@ -291,11 +291,11 @@ public boolean tryComplete() {
291291
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
292292
// those topic partitions.
293293
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
294-
// Map to store the remote fetch metadata corresponding to the topic partitions for which we need to perform remote fetch.
295-
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse);
294+
// Store the remote fetch info and the topic partition for which we need to perform remote fetch.
295+
Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse);
296296

297-
if (!remoteStorageFetchInfoMap.isEmpty()) {
298-
return maybeProcessRemoteFetch(topicPartitionData, remoteStorageFetchInfoMap, replicaManagerReadResponse);
297+
if (topicPartitionRemoteFetchInfoOpt.isPresent()) {
298+
return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get(), replicaManagerReadResponse);
299299
}
300300
maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
301301
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) {
@@ -592,33 +592,40 @@ Meter expiredRequestMeter() {
592592
return expiredRequestMeter;
593593
}
594594

595-
private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> maybePrepareRemoteStorageFetchInfo(
595+
private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo(
596596
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
597597
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
598598
) {
599-
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchMetadataMap = new LinkedHashMap<>();
600-
replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> {
599+
Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap = Optional.empty();
600+
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) {
601+
TopicIdPartition topicIdPartition = entry.getKey();
602+
LogReadResult logReadResult = entry.getValue();
601603
if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
602-
remoteStorageFetchMetadataMap.put(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get());
604+
// TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
605+
// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
606+
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
607+
// fetch for multiple remote fetch topic partition in a single share fetch request
608+
remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get()));
603609
partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition));
610+
break;
604611
}
605-
});
612+
}
606613
return remoteStorageFetchMetadataMap;
607614
}
608615

609616
private boolean maybeProcessRemoteFetch(
610617
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
611-
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
618+
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
612619
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
613620
) throws Exception {
614621
topicPartitionData.keySet().forEach(topicIdPartition -> {
615-
// topic partitions for which fetching would be happening from local log and not remote storage.
616-
if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
622+
// topic partitions for which fetch would not be happening in this share fetch request.
623+
if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
617624
// Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
618625
releasePartitionLocks(Set.of(topicIdPartition));
619626
}
620627
});
621-
Optional<Exception> exceptionOpt = processRemoteFetchOrException(remoteStorageFetchInfoMap, replicaManagerReadResponse);
628+
Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse);
622629
if (exceptionOpt.isPresent()) {
623630
remoteStorageFetchException = exceptionOpt;
624631
throw exceptionOpt.get();
@@ -629,25 +636,15 @@ private boolean maybeProcessRemoteFetch(
629636

630637
/**
631638
* Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional.
632-
* @param remoteStorageFetchInfoMap - The topic partition to remote storage fetch info map
639+
* @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information.
633640
* @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions
634641
*/
635642
private Optional<Exception> processRemoteFetchOrException(
636-
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
643+
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
637644
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
638645
) {
639-
// TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
640-
// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
641-
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
642-
// fetch for multiple remote fetch topic partition in a single share fetch request
643-
TopicIdPartition remoteFetchTopicIdPartition = remoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
644-
RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
645-
646-
LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap = new LinkedHashMap<>();
647-
remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put(
648-
topicIdPartition,
649-
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
650-
));
646+
TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition();
647+
RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
651648
LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
652649

653650
Future<Void> remoteFetchTask;
@@ -667,28 +664,10 @@ private Optional<Exception> processRemoteFetchOrException(
667664
} catch (Exception e) {
668665
return Optional.of(e);
669666
}
670-
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
667+
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo));
671668
return Optional.empty();
672669
}
673670

674-
/**
675-
* This function returns the first topic partition for which we need to perform remote storage fetch. We remove all the
676-
* other partitions that can have a remote storage fetch for further processing and release the fetch locks for them.
677-
* @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information.
678-
* @return the first topic partition for which we need to perform remote storage fetch
679-
*/
680-
private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
681-
Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next();
682-
TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey();
683-
remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
684-
if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
685-
partitionsAcquired.remove(topicIdPartition);
686-
releasePartitionLocks(Set.of(topicIdPartition));
687-
}
688-
});
689-
return remoteFetchTopicIdPartition;
690-
}
691-
692671
/**
693672
* This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent().
694673
* The operation can be completed if:
@@ -701,25 +680,18 @@ private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartit
701680
private boolean maybeCompletePendingRemoteFetch() {
702681
boolean canComplete = false;
703682

704-
for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
705-
TopicIdPartition topicIdPartition = entry.getKey();
706-
LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
707-
try {
708-
if (fetchOffsetMetadata != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
709-
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
710-
}
711-
} catch (KafkaStorageException e) { // Case a
712-
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
713-
canComplete = true;
714-
} catch (UnknownTopicOrPartitionException e) { // Case b
715-
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
716-
canComplete = true;
717-
} catch (NotLeaderOrFollowerException e) { // Case c
718-
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
719-
canComplete = true;
720-
}
721-
if (canComplete)
722-
break;
683+
TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition();
684+
try {
685+
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
686+
} catch (KafkaStorageException e) { // Case a
687+
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
688+
canComplete = true;
689+
} catch (UnknownTopicOrPartitionException e) { // Case b
690+
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
691+
canComplete = true;
692+
} catch (NotLeaderOrFollowerException e) { // Case c
693+
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
694+
canComplete = true;
723695
}
724696

725697
if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d
@@ -813,7 +785,7 @@ private void completeRemoteStorageShareFetchRequest() {
813785
// Get the local log read based topic partitions.
814786
LinkedHashMap<TopicIdPartition, SharePartition> nonRemoteFetchSharePartitions = new LinkedHashMap<>();
815787
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
816-
if (!partitionsAcquired.containsKey(topicIdPartition) && !remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) {
788+
if (!partitionsAcquired.containsKey(topicIdPartition)) {
817789
nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition);
818790
}
819791
});
@@ -880,8 +852,7 @@ public record RemoteFetch(
880852
LogReadResult logReadResult,
881853
Future<Void> remoteFetchTask,
882854
CompletableFuture<RemoteLogReadResult> remoteFetchResult,
883-
RemoteStorageFetchInfo remoteFetchInfo,
884-
LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap
855+
RemoteStorageFetchInfo remoteFetchInfo
885856
) {
886857
@Override
887858
public String toString() {
@@ -891,7 +862,19 @@ public String toString() {
891862
", remoteFetchTask=" + remoteFetchTask +
892863
", remoteFetchResult=" + remoteFetchResult +
893864
", remoteFetchInfo=" + remoteFetchInfo +
894-
", fetchOffsetMetadataMap=" + fetchOffsetMetadataMap +
865+
")";
866+
}
867+
}
868+
869+
public record TopicPartitionRemoteFetchInfo(
870+
TopicIdPartition topicIdPartition,
871+
RemoteStorageFetchInfo remoteStorageFetchInfo
872+
) {
873+
@Override
874+
public String toString() {
875+
return "TopicPartitionRemoteFetchInfo(" +
876+
"topicIdPartition=" + topicIdPartition +
877+
", remoteStorageFetchInfo=" + remoteStorageFetchInfo +
895878
")";
896879
}
897880
}

Diff for: core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1688,9 +1688,11 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
16881688
assertTrue(delayedShareFetch.isCompleted());
16891689
// Pending remote fetch object gets created for delayed share fetch.
16901690
assertNotNull(delayedShareFetch.remoteFetch());
1691-
// Verify the locks are released separately for tp0 (from onComplete) and tp1 (from tryComplete).
1692-
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
1691+
// Verify the locks are released separately for tp1 (from tryComplete).
16931692
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1));
1693+
// From onComplete, the locks will be released for both tp0 and tp1. tp0 because it was acquired from
1694+
// tryComplete and has remote fetch processed. tp1 will be reacquired in onComplete when we check for local log read.
1695+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1));
16941696
assertTrue(shareFetch.isCompleted());
16951697
// Share fetch response only contains the first remote storage fetch topic partition - tp0.
16961698
assertEquals(Set.of(tp0), future.join().keySet());

0 commit comments

Comments
 (0)