From 3d02695454f006b4ad794be39876232f762703ea Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Thu, 10 Apr 2025 19:33:03 +0530 Subject: [PATCH 01/14] Initial commit to add remote storage fetch to share grousp --- .../kafka/server/share/DelayedShareFetch.java | 419 +++++++++++++++--- .../server/share/DelayedShareFetchTest.java | 13 +- 2 files changed, 378 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index d68ed06d3070d..90a7c6e105369 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -22,6 +22,10 @@ import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; @@ -34,8 +38,12 @@ import org.apache.kafka.server.share.fetch.ShareFetchPartitionData; import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.FetchPartitionData; +import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import com.yammer.metrics.core.Meter; @@ -44,10 +52,16 @@ import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; @@ -83,7 +97,9 @@ public class DelayedShareFetch extends DelayedOperation { // Tracks the start time to acquire any share partition for a fetch request. private long acquireStartTimeMs; private LinkedHashMap partitionsAcquired; - private LinkedHashMap partitionsAlreadyFetched; + private LinkedHashMap localPartitionsAlreadyFetched; + private Optional remoteFetchOpt; + private Optional remoteStorageFetchException; /** * This function constructs an instance of delayed share fetch operation for completing share fetch @@ -110,10 +126,12 @@ public DelayedShareFetch( sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM), shareGroupMetrics, - time + time, + Optional.empty() ); } + // The direct usage of this constructor is only from tests. DelayedShareFetch( ShareFetch shareFetch, ReplicaManager replicaManager, @@ -121,19 +139,22 @@ public DelayedShareFetch( LinkedHashMap sharePartitions, PartitionMaxBytesStrategy partitionMaxBytesStrategy, ShareGroupMetrics shareGroupMetrics, - Time time + Time time, + Optional remoteFetchOpt ) { super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); this.shareFetch = shareFetch; this.replicaManager = replicaManager; this.partitionsAcquired = new LinkedHashMap<>(); - this.partitionsAlreadyFetched = new LinkedHashMap<>(); + this.localPartitionsAlreadyFetched = new LinkedHashMap<>(); this.exceptionHandler = exceptionHandler; this.sharePartitions = sharePartitions; this.partitionMaxBytesStrategy = partitionMaxBytesStrategy; this.shareGroupMetrics = shareGroupMetrics; this.time = time; this.acquireStartTimeMs = time.hiResClockMs(); + this.remoteFetchOpt = remoteFetchOpt; + this.remoteStorageFetchException = Optional.empty(); // Register metrics for DelayedShareFetch. KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics"); this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS); @@ -141,6 +162,15 @@ public DelayedShareFetch( @Override public void onExpiration() { + // cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is + // already running as it may force closing opened/cached resources as transaction index. + if (remoteFetchOpt.isPresent()) { + boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false); + if (!cancelled) { + log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}", + remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone()); + } + } expiredRequestMeter.mark(); } @@ -152,58 +182,68 @@ public void onExpiration() { @Override public void onComplete() { // We are utilizing lock so that onComplete doesn't do a dirty read for instance variables - - // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + // partitionsAcquired and localPartitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), partitionsAcquired.keySet()); try { - LinkedHashMap topicPartitionData; - // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. - if (partitionsAcquired.isEmpty()) { - topicPartitionData = acquirablePartitions(); - // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks - // for the share partition, hence if no partitions are yet acquired by tryComplete, - // we record the metric here. Do not check if the request has successfully acquired any - // partitions now or not, as then the upper bound of request timeout shall be recorded - // for the metric. - updateAcquireElapsedTimeMetric(); + if (remoteStorageFetchException.isPresent()) { + completeErroneousRemoteShareFetchRequest(); + } else if (remoteFetchOpt.isPresent()) { + completeRemoteStorageShareFetchRequest(); } else { - // tryComplete invoked forceComplete, so we can use the data from tryComplete. - topicPartitionData = partitionsAcquired; + completeShareFetchRequest(); } - - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); - shareFetch.maybeComplete(Map.of()); - return; - } else { - // Update metric to record acquired to requested partitions. - double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); - } - log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); - - completeShareFetchRequest(topicPartitionData); } finally { lock.unlock(); } } - private void completeShareFetchRequest(LinkedHashMap topicPartitionData) { + private void completeShareFetchRequest() { + LinkedHashMap topicPartitionData; + // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. + if (partitionsAcquired.isEmpty()) { + topicPartitionData = acquirablePartitions(sharePartitions); + // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks + // for the share partition, hence if no partitions are yet acquired by tryComplete, + // we record the metric here. Do not check if the request has successfully acquired any + // partitions now or not, as then the upper bound of request timeout shall be recorded + // for the metric. + updateAcquireElapsedTimeMetric(); + } else { + // tryComplete invoked forceComplete, so we can use the data from tryComplete. + topicPartitionData = partitionsAcquired; + } + + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request with an empty response. + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); + shareFetch.maybeComplete(Map.of()); + return; + } else { + // Update metric to record acquired to requested partitions. + double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + } + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + + processAcquiredTopicPartitions(topicPartitionData); + } + + private void processAcquiredTopicPartitions(LinkedHashMap topicPartitionData) { try { LinkedHashMap responseData; - if (partitionsAlreadyFetched.isEmpty()) + if (localPartitionsAlreadyFetched.isEmpty()) responseData = readFromLog( topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size())); else // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting // updated in a different tryComplete thread. - responseData = combineLogReadResponse(topicPartitionData, partitionsAlreadyFetched); + responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); List shareFetchPartitionDataList = new ArrayList<>(); responseData.forEach((topicIdPartition, logReadResult) -> @@ -225,15 +265,7 @@ private void completeShareFetchRequest(LinkedHashMap top log.error("Error processing delayed share fetch request", e); handleFetchException(shareFetch, topicPartitionData.keySet(), e); } finally { - // Releasing the lock to move ahead with the next request in queue. - releasePartitionLocks(topicPartitionData.keySet()); - // If we have a fetch request completed for a topic-partition, we release the locks for that partition, - // then we should check if there is a pending share fetch request for the topic-partition and complete it. - // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if - // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addToActionQueue(() -> topicPartitionData.keySet().forEach(topicIdPartition -> - replicaManager.completeDelayedShareFetchRequest( - new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet()); } } @@ -242,8 +274,14 @@ private void completeShareFetchRequest(LinkedHashMap top */ @Override public boolean tryComplete() { - LinkedHashMap topicPartitionData = acquirablePartitions(); + // Check to see if the remote fetch is in flight. If there is an in flight remote fetch we want to resolve it first. + // This will help to prevent starving remote storage partitions and wasting the significant upfront work involved with + // kicking off a fetch from remote storage. + if (remoteFetchOpt.isPresent()) { + return maybeCompletePendingRemoteFetch(); + } + LinkedHashMap topicPartitionData = acquirablePartitions(sharePartitions); try { if (!topicPartitionData.isEmpty()) { // Update the metric to record the time taken to acquire the locks for the share partitions. @@ -252,10 +290,16 @@ public boolean tryComplete() { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); + // Map to store the remote fetch metadata corresponding to the topic partitions for which we need to perform remote fetch. + LinkedHashMap remoteStorageFetchInfoMap = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); + + if (!remoteStorageFetchInfoMap.isEmpty()) { + return maybeProcessRemoteFetch(topicPartitionData, remoteStorageFetchInfoMap, replicaManagerReadResponse); + } maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) { partitionsAcquired = topicPartitionData; - partitionsAlreadyFetched = replicaManagerReadResponse; + localPartitionsAlreadyFetched = replicaManagerReadResponse; boolean completedByMe = forceComplete(); // If invocation of forceComplete is not successful, then that means the request is already completed // hence release the acquired locks. @@ -277,9 +321,15 @@ public boolean tryComplete() { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - releasePartitionLocks(topicPartitionData.keySet()); - partitionsAcquired.clear(); - partitionsAlreadyFetched.clear(); + // In case we have a remote fetch exception, we have already released locks for partitions which have potential + // local log read. We do not release locks for partitions which have a remote storage read because we need to + // complete the share fetch request in onComplete and if we release the locks early here, some other DelayedShareFetch + // request might get the locks for those partitions without this one getting complete. + if (remoteStorageFetchException.isEmpty()) { + releasePartitionLocks(topicPartitionData.keySet()); + partitionsAcquired.clear(); + localPartitionsAlreadyFetched.clear(); + } return forceComplete(); } } @@ -288,11 +338,13 @@ public boolean tryComplete() { * Prepare fetch request structure for partitions in the share fetch request for which we can acquire records. */ // Visible for testing - LinkedHashMap acquirablePartitions() { + LinkedHashMap acquirablePartitions( + LinkedHashMap sharePartitionsForAcquire + ) { // Initialize the topic partitions for which the fetch should be attempted. LinkedHashMap topicPartitionData = new LinkedHashMap<>(); - sharePartitions.forEach((topicIdPartition, sharePartition) -> { + sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) -> { // Add the share partition to the list of partitions to be fetched only if we can // acquire the fetch lock on it. if (sharePartition.maybeAcquireFetchLock()) { @@ -533,4 +585,269 @@ Lock lock() { Meter expiredRequestMeter() { return expiredRequestMeter; } + + private LinkedHashMap maybePrepareRemoteStorageFetchInfo( + LinkedHashMap topicPartitionData, + LinkedHashMap replicaManagerReadResponse + ) { + LinkedHashMap remoteStorageFetchMetadataMap = new LinkedHashMap<>(); + replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> { + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + remoteStorageFetchMetadataMap.put(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get()); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + localPartitionsAlreadyFetched.put(topicIdPartition, logReadResult); + } + }); + return remoteStorageFetchMetadataMap; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap topicPartitionData, + LinkedHashMap remoteStorageFetchInfoMap, + LinkedHashMap replicaManagerReadResponse + ) throws Exception { + // topic partitions for which fetching would be happening from local log and not remote storage. + Set localFetchTopicPartitions = new LinkedHashSet<>(); + topicPartitionData.keySet().forEach(topicIdPartition -> { + if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) { + localFetchTopicPartitions.add(topicIdPartition); + } + }); + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocks(localFetchTopicPartitions); + Optional exceptionOpt = processRemoteFetchOrException(remoteStorageFetchInfoMap, replicaManagerReadResponse); + if (exceptionOpt.isPresent()) { + remoteStorageFetchException = exceptionOpt; + throw exceptionOpt.get(); + } + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. + * @param remoteStorageFetchInfoMap - The topic partition to remote storage fetch info map + */ + private Optional processRemoteFetchOrException( + LinkedHashMap remoteStorageFetchInfoMap, + LinkedHashMap replicaManagerReadResponse + ) { + Map fetchOffsetMetadataMap = new LinkedHashMap<>(); + remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put( + topicIdPartition, + replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata + )); + + Future remoteFetchTask; + CompletableFuture remoteFetchResult = new CompletableFuture<>(); + // TODO: This is a limitation in remote storage fetch that there will be fetch only for a single topic partition. + TopicIdPartition topicIdPartition = remoteStorageFetchInfoMap.keySet().stream().toList().get(0); + RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.values().stream().toList().get(0); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + } + ); + } catch (RejectedExecutionException e) { + // Return the error if any in scheduling the remote fetch task. + log.warn("Unable to fetch data from remote storage", e); + return Optional.of(e); + } catch (Exception e) { + return Optional.of(e); + } + remoteFetchOpt = Optional.of(new RemoteFetch(topicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap)); + return Optional.empty(); + } + + /** + * This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent(). + * The operation can be completed if: + * Case a: The partition is in an offline log directory on this broker + * Case b: This broker does not know the partition it tries to fetch + * Case c: This broker is no longer the leader of the partition it tries to fetch + * Case d: The remote storage read request completed (succeeded or failed) + * @return boolean representing whether the remote fetch is completed or not. + */ + private boolean maybeCompletePendingRemoteFetch() { + boolean canComplete = false; + + for (Map.Entry entry : remoteFetchOpt.get().fetchOffsetMetadataMap.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogOffsetMetadata fetchOffsetMetadata = entry.getValue(); + try { + if (fetchOffsetMetadata != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) { + replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + } + } catch (KafkaStorageException e) { // Case a + log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (UnknownTopicOrPartitionException e) { // Case b + log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (NotLeaderOrFollowerException e) { // Case c + log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } + if (canComplete) + break; + } + + if (canComplete || remoteFetchOpt.get().remoteFetchResult.isDone()) { // Case d + boolean completedByMe = forceComplete(); + // If invocation of forceComplete is not successful, then that means the request is already completed + // hence release the acquired locks. + if (!completedByMe) { + releasePartitionLocks(partitionsAcquired.keySet()); + } + return completedByMe; + } else + return false; + } + + /** + * This function completes a share fetch request for which we have identified erroneous remote storage fetch in tryComplete() + * It should only be called when we know that there is remote fetch in-flight/completed. + */ + private void completeErroneousRemoteShareFetchRequest() { + try { + handleFetchException(shareFetch, partitionsAcquired.keySet(), remoteStorageFetchException.get()); + } finally { + releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet()); + } + + } + + private void releasePartitionLocksAndAddToActionQueue(Set topicIdPartitions) { + // Releasing the lock to move ahead with the next request in queue. + releasePartitionLocks(topicIdPartitions); + // If we have a fetch request completed for a topic-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the topic-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.completeDelayedShareFetchRequest( + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + } + + /** + * This function completes a share fetch request for which we have identified remoteFetch during tryComplete() + * It should only be called when we know that there is remote fetch in-flight/completed. + */ + private void completeRemoteStorageShareFetchRequest() { + LinkedHashMap nonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); + try { + List shareFetchPartitionData = new ArrayList<>(); + int readableBytes = 0; + if (remoteFetchOpt.get().remoteFetchResult.isDone()) { + RemoteFetch remoteFetch = remoteFetchOpt.get(); + if (remoteFetch.remoteFetchResult().get().error.isPresent()) { + Throwable error = remoteFetch.remoteFetchResult().get().error.get(); + // If there is any error for the remote fetch topic partition, we populate the error accordingly. + shareFetchPartitionData.add( + new ShareFetchPartitionData( + remoteFetch.topicIdPartition, + partitionsAcquired.get(remoteFetch.topicIdPartition), + ReplicaManager.createLogReadResult(error).toFetchPartitionData(false) + ) + ); + } else { + FetchDataInfo info = remoteFetch.remoteFetchResult().get().fetchDataInfo.get(); + TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition; + LogReadResult logReadResult = localPartitionsAlreadyFetched.get(topicIdPartition); + shareFetchPartitionData.add( + new ShareFetchPartitionData( + topicIdPartition, + partitionsAcquired.get(remoteFetch.topicIdPartition), + new FetchPartitionData( + logReadResult.error(), + logReadResult.highWatermark(), + logReadResult.leaderLogStartOffset(), + info.records, + Optional.empty(), + logReadResult.lastStableOffset().isDefined() ? OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : OptionalLong.empty(), + info.abortedTransactions, + logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) logReadResult.preferredReadReplica().get()) : OptionalInt.empty(), + false + ) + ) + ); + readableBytes += info.records.sizeInBytes(); + } + } + + // If remote fetch bytes < shareFetch.fetchParams().maxBytes, then we will try for a local read. + if (readableBytes < shareFetch.fetchParams().maxBytes) { + // Get the local log read based topic partitions. + LinkedHashMap nonRemoteFetchSharePartitions = new LinkedHashMap<>(); + sharePartitions.forEach((topicIdPartition, sharePartition) -> { + if (!partitionsAcquired.containsKey(topicIdPartition)) { + nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition); + } + }); + nonRemoteFetchTopicPartitionData = acquirablePartitions(nonRemoteFetchSharePartitions); + if (!nonRemoteFetchTopicPartitionData.isEmpty()) { + log.trace("Fetchable local share partitions for a remote share fetch request data: {} with groupId: {} fetch params: {}", + nonRemoteFetchTopicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + + LinkedHashMap responseData = readFromLog( + nonRemoteFetchTopicPartitionData, + partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - readableBytes, nonRemoteFetchTopicPartitionData.keySet(), nonRemoteFetchTopicPartitionData.size())); + for (Map.Entry entry : responseData.entrySet()) { + if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) { + shareFetchPartitionData.add( + new ShareFetchPartitionData( + entry.getKey(), + nonRemoteFetchTopicPartitionData.get(entry.getKey()), + entry.getValue().toFetchPartitionData(false) + ) + ); + } + } + } + } + + // Update metric to record acquired to requested partitions. + double requestTopicToAcquired = (double) (partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size(); + if (requestTopicToAcquired > 0) + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + + Map remoteFetchResponse = ShareFetchUtils.processFetchResponse( + shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler); + shareFetch.maybeComplete(remoteFetchResponse); + log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + log.error("Error processing delayed share fetch request", e); + Set topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); + topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet()); + handleFetchException(shareFetch, topicIdPartitions, e); + } finally { + Set topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); + topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet()); + releasePartitionLocksAndAddToActionQueue(topicIdPartitions); + } + } + + public record RemoteFetch( + TopicIdPartition topicIdPartition, + Future remoteFetchTask, + CompletableFuture remoteFetchResult, + RemoteStorageFetchInfo remoteFetchInfo, + Map fetchOffsetMetadataMap + ) { + @Override + public String toString() { + return "RemoteFetch(" + + "topicIdPartition=" + topicIdPartition + + ", remoteFetchTask=" + remoteFetchTask + + ", remoteFetchResult=" + remoteFetchResult + + ", remoteFetchInfo=" + remoteFetchInfo + + ", fetchOffsetMetadataMap=" + fetchOffsetMetadataMap + + ")"; + } + } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 6b3165bae1531..204649d1ff681 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -488,7 +488,7 @@ public void testToCompleteAnAlreadyCompletedFuture() { delayedShareFetch.forceComplete(); assertTrue(delayedShareFetch.isCompleted()); // Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch. - Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(sharePartitions); assertEquals(0, future.join().size()); assertTrue(delayedShareFetch.lock().tryLock()); delayedShareFetch.lock().unlock(); @@ -498,7 +498,7 @@ public void testToCompleteAnAlreadyCompletedFuture() { delayedShareFetch.forceComplete(); assertTrue(delayedShareFetch.isCompleted()); // Verifying that the second forceComplete does not call acquirablePartitions method in DelayedShareFetch. - Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(sharePartitions); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); // Assert both metrics shall be recorded only once. @@ -1195,6 +1195,7 @@ static class DelayedShareFetchBuilder { private LinkedHashMap sharePartitions = mock(LinkedHashMap.class); private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); private Time time = new MockTime(); + private Optional remoteFetch = Optional.empty(); private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class); DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) { @@ -1232,6 +1233,11 @@ private DelayedShareFetchBuilder withTime(Time time) { return this; } + private DelayedShareFetchBuilder withRemoteFetch(DelayedShareFetch.RemoteFetch remoteFetch) { + this.remoteFetch = Optional.of(remoteFetch); + return this; + } + public static DelayedShareFetchBuilder builder() { return new DelayedShareFetchBuilder(); } @@ -1244,7 +1250,8 @@ public DelayedShareFetch build() { sharePartitions, partitionMaxBytesStrategy, shareGroupMetrics, - time); + time, + remoteFetch); } } } From 818684c4adf51e61abf211d2482e41a1c975e44a Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Sat, 12 Apr 2025 14:03:33 +0530 Subject: [PATCH 02/14] Added unit tests --- .../kafka/server/share/DelayedShareFetch.java | 34 +- .../server/share/DelayedShareFetchTest.java | 400 ++++++++++++++++++ 2 files changed, 424 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 90a7c6e105369..98ad6f7e42daf 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -162,14 +162,8 @@ public DelayedShareFetch( @Override public void onExpiration() { - // cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is - // already running as it may force closing opened/cached resources as transaction index. if (remoteFetchOpt.isPresent()) { - boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false); - if (!cancelled) { - log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}", - remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone()); - } + cancelRemoteFetchTask(); } expiredRequestMeter.mark(); } @@ -581,6 +575,11 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; @@ -674,7 +673,7 @@ private Optional processRemoteFetchOrException( private boolean maybeCompletePendingRemoteFetch() { boolean canComplete = false; - for (Map.Entry entry : remoteFetchOpt.get().fetchOffsetMetadataMap.entrySet()) { + for (Map.Entry entry : remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); LogOffsetMetadata fetchOffsetMetadata = entry.getValue(); try { @@ -695,7 +694,7 @@ private boolean maybeCompletePendingRemoteFetch() { break; } - if (canComplete || remoteFetchOpt.get().remoteFetchResult.isDone()) { // Case d + if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d boolean completedByMe = forceComplete(); // If invocation of forceComplete is not successful, then that means the request is already completed // hence release the acquired locks. @@ -734,7 +733,7 @@ private void releasePartitionLocksAndAddToActionQueue(Set topi /** * This function completes a share fetch request for which we have identified remoteFetch during tryComplete() - * It should only be called when we know that there is remote fetch in-flight/completed. + * Note - This function should only be called when we know that there is remote fetch in-flight/completed. */ private void completeRemoteStorageShareFetchRequest() { LinkedHashMap nonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); @@ -776,6 +775,8 @@ private void completeRemoteStorageShareFetchRequest() { ); readableBytes += info.records.sizeInBytes(); } + } else { + cancelRemoteFetchTask(); } // If remote fetch bytes < shareFetch.fetchParams().maxBytes, then we will try for a local read. @@ -832,6 +833,19 @@ private void completeRemoteStorageShareFetchRequest() { } } + /** + * Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is + * already running as it may force closing opened/cached resources as transaction index. + * Note - This function should only be called when we know that there is a remote fetch in-flight/completed. + */ + private void cancelRemoteFetchTask() { + boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false); + if (!cancelled) { + log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}", + remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone()); + } + } + public record RemoteFetch( TopicIdPartition topicIdPartition, Future remoteFetchTask, diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 846645c67b826..753aa23203787 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -17,6 +17,7 @@ package kafka.server.share; import kafka.cluster.Partition; +import kafka.log.remote.RemoteLogManager; import kafka.server.LogReadResult; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -25,7 +26,11 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; @@ -46,6 +51,8 @@ import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; @@ -54,6 +61,7 @@ import org.mockito.Mockito; import java.util.ArrayList; +import java.util.function.Consumer; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -61,10 +69,14 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import scala.Option; import scala.Tuple2; +import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; @@ -73,6 +85,7 @@ import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -1155,6 +1168,361 @@ public void testOnCompleteExecutionOnTimeout() { assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); } + @Test + public void testRemoteStorageFetchTryCompleteReturnsFalse() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp1 and remote storage read result for tp2. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for local log read topic partitions tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompleteThrowsException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for sp0 and sp1. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp0 and remote storage read result for tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Exception will be thrown during the creation of remoteFetch object. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenThrow(new RejectedExecutionException("Exception thrown")); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + BiConsumer exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + // tryComplete returns true and goes to forceComplete once the exception occurs. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertFalse(future.isCompletedExceptionally()); + assertEquals(Set.of(tp1), future.join().keySet()); + // Exception occurred and was handled. + Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).onComplete(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock but the broker becomes unavailable. + Future remoteFetchTask = mock(Future.class); + doAnswer(invocation -> { + when(remoteFetchTask.isCancelled()).thenReturn(true); + return false; + }).when(remoteFetchTask).cancel(false); + + when(remoteFetchTask.cancel(false)).thenReturn(true); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(remoteFetchTask); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled()); + // Partition locks should be released for all 3 topic partitions + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response contained tp0 and tp1 (local fetch) but not tp2, since it errored out. + assertEquals(Set.of(tp0, tp1), future.join().keySet()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.empty(), + Optional.of(new TimeoutException("Error occurred while creating remote fetch result")) // Remote fetch result is returned with an error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY, + false, Optional.empty(), Optional.of(mock(RemoteStorageFetchInfo.class)))), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), @@ -1182,6 +1550,38 @@ private PartitionMaxBytesStrategy mockPartitionMaxBytes(Set pa return partitionMaxBytesStrategy; } + private Seq> buildLocalAndRemoteFetchResult( + Set localLogReadTopicIdPartitions, + Set remoteReadTopicIdPartitions) { + List> logReadResults = new ArrayList<>(); + localLogReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY), + Option.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + Option.empty(), + Option.empty(), + Option.empty() + )))); + remoteReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY, + false, Optional.empty(), Optional.of(mock(RemoteStorageFetchInfo.class))), + Option.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + Option.empty(), + Option.empty(), + Option.empty() + )))); + return CollectionConverters.asScala(logReadResults).toSeq(); + } + @SuppressWarnings("unchecked") private static BiConsumer mockExceptionHandler() { return mock(BiConsumer.class); From 7cadffd233a0d366eba96f95f1033c968c7269bf Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Sat, 12 Apr 2025 15:27:46 +0530 Subject: [PATCH 03/14] Added more unit tests --- .../kafka/server/share/DelayedShareFetch.java | 13 +- .../server/share/DelayedShareFetchTest.java | 190 +++++++++++++++++- 2 files changed, 186 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 98ad6f7e42daf..d0df6d3dc662a 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -188,14 +188,14 @@ public void onComplete() { } else if (remoteFetchOpt.isPresent()) { completeRemoteStorageShareFetchRequest(); } else { - completeShareFetchRequest(); + completeLocalLogShareFetchRequest(); } } finally { lock.unlock(); } } - private void completeShareFetchRequest() { + private void completeLocalLogShareFetchRequest() { LinkedHashMap topicPartitionData; // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. if (partitionsAcquired.isEmpty()) { @@ -631,7 +631,7 @@ private Optional processRemoteFetchOrException( LinkedHashMap remoteStorageFetchInfoMap, LinkedHashMap replicaManagerReadResponse ) { - Map fetchOffsetMetadataMap = new LinkedHashMap<>(); + LinkedHashMap fetchOffsetMetadataMap = new LinkedHashMap<>(); remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put( topicIdPartition, replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata @@ -640,8 +640,9 @@ private Optional processRemoteFetchOrException( Future remoteFetchTask; CompletableFuture remoteFetchResult = new CompletableFuture<>(); // TODO: This is a limitation in remote storage fetch that there will be fetch only for a single topic partition. - TopicIdPartition topicIdPartition = remoteStorageFetchInfoMap.keySet().stream().toList().get(0); - RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.values().stream().toList().get(0); + Map.Entry firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next(); + TopicIdPartition topicIdPartition = firstRemoteStorageFetchInfo.getKey(); + RemoteStorageFetchInfo remoteStorageFetchInfo = firstRemoteStorageFetchInfo.getValue(); try { remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( remoteStorageFetchInfo, @@ -851,7 +852,7 @@ public record RemoteFetch( Future remoteFetchTask, CompletableFuture remoteFetchResult, RemoteStorageFetchInfo remoteFetchInfo, - Map fetchOffsetMetadataMap + LinkedHashMap fetchOffsetMetadataMap ) { @Override public String toString() { diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 753aa23203787..83079a94dc108 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -61,7 +61,6 @@ import org.mockito.Mockito; import java.util.ArrayList; -import java.util.function.Consumer; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -72,6 +71,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import scala.Option; @@ -108,6 +108,8 @@ public class DelayedShareFetchTest { private static final FetchParams FETCH_PARAMS = new FetchParams( FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true); + private static final FetchDataInfo REMOTE_FETCH_INFO = new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), + MemoryRecords.EMPTY, false, Optional.empty(), Optional.of(mock(RemoteStorageFetchInfo.class))); private static final BrokerTopicStats BROKER_TOPIC_STATS = new BrokerTopicStats(); private Timer mockTimer; @@ -1485,8 +1487,7 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfull // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( - Optional.of(new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY, - false, Optional.empty(), Optional.of(mock(RemoteStorageFetchInfo.class)))), + Optional.of(REMOTE_FETCH_INFO), Optional.empty() // Remote fetch result is returned successfully without error. ); RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); @@ -1523,6 +1524,179 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfull delayedShareFetch.lock().unlock(); } + @Test + public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + + // Except tp3, all the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp3.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + sharePartitions.put(tp3, sp3); + + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset does not match with the cached entry for sp0, sp1 and sp2. Hence, a replica manager fetch will happen for all of them in tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp0, tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // the future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0, tp1, tp2), future.join().keySet()); + // Verify the locks are released for both local log and remote storage read topic partitions tp0, tp1 and tp2. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 and sp1 are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for both. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + LinkedHashSet remoteStorageFetchPartitions = new LinkedHashSet<>(); + remoteStorageFetchPartitions.add(tp0); + remoteStorageFetchPartitions.add(tp1); + + // Mocking remote storage read result for tp0 and tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), remoteStorageFetchPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response only contains the first remote storage fetch topic partition - tp0. + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), @@ -1567,8 +1741,7 @@ private Seq> buildLocalAndRemoteFetchRes Option.empty() )))); remoteReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( - new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY, - false, Optional.empty(), Optional.of(mock(RemoteStorageFetchInfo.class))), + REMOTE_FETCH_INFO, Option.empty(), -1L, -1L, @@ -1594,7 +1767,7 @@ static class DelayedShareFetchBuilder { private LinkedHashMap sharePartitions = mock(LinkedHashMap.class); private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); private Time time = new MockTime(); - private Optional remoteFetch = Optional.empty(); + private final Optional remoteFetch = Optional.empty(); private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class); DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) { @@ -1632,11 +1805,6 @@ private DelayedShareFetchBuilder withTime(Time time) { return this; } - private DelayedShareFetchBuilder withRemoteFetch(DelayedShareFetch.RemoteFetch remoteFetch) { - this.remoteFetch = Optional.of(remoteFetch); - return this; - } - public static DelayedShareFetchBuilder builder() { return new DelayedShareFetchBuilder(); } From af9e6cac7ab4b883a7e1c20762f23e17e251441a Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Sat, 12 Apr 2025 18:43:25 +0530 Subject: [PATCH 04/14] Code refactor --- .../kafka/server/share/DelayedShareFetch.java | 45 ++++++++++++++----- .../server/share/DelayedShareFetchTest.java | 5 ++- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index d0df6d3dc662a..4f03070528ac0 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -631,6 +631,13 @@ private Optional processRemoteFetchOrException( LinkedHashMap remoteStorageFetchInfoMap, LinkedHashMap replicaManagerReadResponse ) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + TopicIdPartition remoteFetchTopicIdPartition = getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap); + RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition); + LinkedHashMap fetchOffsetMetadataMap = new LinkedHashMap<>(); remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put( topicIdPartition, @@ -639,16 +646,12 @@ private Optional processRemoteFetchOrException( Future remoteFetchTask; CompletableFuture remoteFetchResult = new CompletableFuture<>(); - // TODO: This is a limitation in remote storage fetch that there will be fetch only for a single topic partition. - Map.Entry firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next(); - TopicIdPartition topicIdPartition = firstRemoteStorageFetchInfo.getKey(); - RemoteStorageFetchInfo remoteStorageFetchInfo = firstRemoteStorageFetchInfo.getValue(); try { remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( remoteStorageFetchInfo, result -> { remoteFetchResult.complete(result); - replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); } ); } catch (RejectedExecutionException e) { @@ -658,10 +661,28 @@ private Optional processRemoteFetchOrException( } catch (Exception e) { return Optional.of(e); } - remoteFetchOpt = Optional.of(new RemoteFetch(topicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap)); + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap)); return Optional.empty(); } + /** + * This function returns the first topic partition for which we need to perform remote storage fetch. We remove all the + * other partitions that can have a remote storage fetch for further processing and release the fetch locks for them. + * @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information. + * @return the first topic partition for which we need to perform remote storage fetch + */ + private TopicIdPartition getRemoteFetchTopicIdPartition(LinkedHashMap remoteStorageFetchInfoMap) { + Map.Entry firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next(); + TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey(); + remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> { + if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) { + partitionsAcquired.remove(topicIdPartition); + releasePartitionLocks(Set.of(topicIdPartition)); + } + }); + return remoteFetchTopicIdPartition; + } + /** * This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent(). * The operation can be completed if: @@ -741,26 +762,26 @@ private void completeRemoteStorageShareFetchRequest() { try { List shareFetchPartitionData = new ArrayList<>(); int readableBytes = 0; - if (remoteFetchOpt.get().remoteFetchResult.isDone()) { + if (remoteFetchOpt.get().remoteFetchResult().isDone()) { RemoteFetch remoteFetch = remoteFetchOpt.get(); if (remoteFetch.remoteFetchResult().get().error.isPresent()) { Throwable error = remoteFetch.remoteFetchResult().get().error.get(); // If there is any error for the remote fetch topic partition, we populate the error accordingly. shareFetchPartitionData.add( new ShareFetchPartitionData( - remoteFetch.topicIdPartition, - partitionsAcquired.get(remoteFetch.topicIdPartition), + remoteFetch.topicIdPartition(), + partitionsAcquired.get(remoteFetch.topicIdPartition()), ReplicaManager.createLogReadResult(error).toFetchPartitionData(false) ) ); } else { FetchDataInfo info = remoteFetch.remoteFetchResult().get().fetchDataInfo.get(); - TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition; + TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition(); LogReadResult logReadResult = localPartitionsAlreadyFetched.get(topicIdPartition); shareFetchPartitionData.add( new ShareFetchPartitionData( topicIdPartition, - partitionsAcquired.get(remoteFetch.topicIdPartition), + partitionsAcquired.get(remoteFetch.topicIdPartition()), new FetchPartitionData( logReadResult.error(), logReadResult.highWatermark(), @@ -785,7 +806,7 @@ private void completeRemoteStorageShareFetchRequest() { // Get the local log read based topic partitions. LinkedHashMap nonRemoteFetchSharePartitions = new LinkedHashMap<>(); sharePartitions.forEach((topicIdPartition, sharePartition) -> { - if (!partitionsAcquired.containsKey(topicIdPartition)) { + if (!partitionsAcquired.containsKey(topicIdPartition) && !remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) { nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition); } }); diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 83079a94dc108..a02925c1224b9 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -1687,8 +1687,9 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() { assertTrue(delayedShareFetch.isCompleted()); // Pending remote fetch object gets created for delayed share fetch. assertNotNull(delayedShareFetch.remoteFetch()); - // Verify the locks are released for tp0 and tp1. - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + // Verify the locks are released separately for tp0 (from onComplete) and tp1 (from tryComplete). + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); assertTrue(shareFetch.isCompleted()); // Share fetch response only contains the first remote storage fetch topic partition - tp0. assertEquals(Set.of(tp0), future.join().keySet()); From e39508c5b3423857643fb825feb3f5b20baecff3 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Sat, 12 Apr 2025 21:56:42 +0530 Subject: [PATCH 05/14] Trigger build From 5ae1203436a26c7d3bd95ee6a9fd14fbd0689dd1 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Tue, 15 Apr 2025 12:48:58 +0530 Subject: [PATCH 06/14] Addressed Jun's round 1 review comments - part 1 --- .../kafka/server/share/DelayedShareFetch.java | 57 +++++++++++-------- .../server/share/DelayedShareFetchTest.java | 3 +- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 4f03070528ac0..4db206bb4f56b 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -131,7 +131,19 @@ public DelayedShareFetch( ); } - // The direct usage of this constructor is only from tests. + /** + * This function constructs an instance of delayed share fetch operation for completing share fetch + * requests instantaneously or with delay. The direct usage of this constructor is only from tests. + * + * @param shareFetch The share fetch parameters of the share fetch request. + * @param replicaManager The replica manager instance used to read from log/complete the request. + * @param exceptionHandler The handler to complete share fetch requests with exception. + * @param sharePartitions The share partitions referenced in the share fetch request. + * @param partitionMaxBytesStrategy The strategy to identify the max bytes for topic partitions in the share fetch request. + * @param shareGroupMetrics The share group metrics to record the metrics. + * @param time The system time. + * @param remoteFetchOpt Optional containing an in-flight remote fetch object or an empty optional. + */ DelayedShareFetch( ShareFetch shareFetch, ReplicaManager replicaManager, @@ -162,9 +174,6 @@ public DelayedShareFetch( @Override public void onExpiration() { - if (remoteFetchOpt.isPresent()) { - cancelRemoteFetchTask(); - } expiredRequestMeter.mark(); } @@ -224,10 +233,10 @@ private void completeLocalLogShareFetchRequest() { log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); - processAcquiredTopicPartitions(topicPartitionData); + processAcquiredTopicPartitionsForLocalLogFetch(topicPartitionData); } - private void processAcquiredTopicPartitions(LinkedHashMap topicPartitionData) { + private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap topicPartitionData) { try { LinkedHashMap responseData; if (localPartitionsAlreadyFetched.isEmpty()) @@ -269,8 +278,6 @@ private void processAcquiredTopicPartitions(LinkedHashMap maybePrepareRemo if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { remoteStorageFetchMetadataMap.put(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get()); partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); - localPartitionsAlreadyFetched.put(topicIdPartition, logReadResult); } }); return remoteStorageFetchMetadataMap; @@ -605,15 +611,13 @@ private boolean maybeProcessRemoteFetch( LinkedHashMap remoteStorageFetchInfoMap, LinkedHashMap replicaManagerReadResponse ) throws Exception { - // topic partitions for which fetching would be happening from local log and not remote storage. - Set localFetchTopicPartitions = new LinkedHashSet<>(); topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetching would be happening from local log and not remote storage. if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) { - localFetchTopicPartitions.add(topicIdPartition); + // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. + releasePartitionLocks(Set.of(topicIdPartition)); } }); - // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. - releasePartitionLocks(localFetchTopicPartitions); Optional exceptionOpt = processRemoteFetchOrException(remoteStorageFetchInfoMap, replicaManagerReadResponse); if (exceptionOpt.isPresent()) { remoteStorageFetchException = exceptionOpt; @@ -626,6 +630,7 @@ private boolean maybeProcessRemoteFetch( /** * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. * @param remoteStorageFetchInfoMap - The topic partition to remote storage fetch info map + * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions */ private Optional processRemoteFetchOrException( LinkedHashMap remoteStorageFetchInfoMap, @@ -635,7 +640,7 @@ private Optional processRemoteFetchOrException( // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform // fetch for multiple remote fetch topic partition in a single share fetch request - TopicIdPartition remoteFetchTopicIdPartition = getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap); + TopicIdPartition remoteFetchTopicIdPartition = remoteFetchTopicIdPartition(remoteStorageFetchInfoMap); RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition); LinkedHashMap fetchOffsetMetadataMap = new LinkedHashMap<>(); @@ -643,6 +648,7 @@ private Optional processRemoteFetchOrException( topicIdPartition, replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata )); + LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition); Future remoteFetchTask; CompletableFuture remoteFetchResult = new CompletableFuture<>(); @@ -661,7 +667,7 @@ private Optional processRemoteFetchOrException( } catch (Exception e) { return Optional.of(e); } - remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap)); + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap)); return Optional.empty(); } @@ -671,7 +677,7 @@ private Optional processRemoteFetchOrException( * @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information. * @return the first topic partition for which we need to perform remote storage fetch */ - private TopicIdPartition getRemoteFetchTopicIdPartition(LinkedHashMap remoteStorageFetchInfoMap) { + private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap remoteStorageFetchInfoMap) { Map.Entry firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next(); TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey(); remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> { @@ -764,8 +770,9 @@ private void completeRemoteStorageShareFetchRequest() { int readableBytes = 0; if (remoteFetchOpt.get().remoteFetchResult().isDone()) { RemoteFetch remoteFetch = remoteFetchOpt.get(); - if (remoteFetch.remoteFetchResult().get().error.isPresent()) { - Throwable error = remoteFetch.remoteFetchResult().get().error.get(); + RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get(); + if (remoteLogReadResult.error.isPresent()) { + Throwable error = remoteLogReadResult.error.get(); // If there is any error for the remote fetch topic partition, we populate the error accordingly. shareFetchPartitionData.add( new ShareFetchPartitionData( @@ -775,9 +782,9 @@ private void completeRemoteStorageShareFetchRequest() { ) ); } else { - FetchDataInfo info = remoteFetch.remoteFetchResult().get().fetchDataInfo.get(); + FetchDataInfo info = remoteLogReadResult.fetchDataInfo.get(); TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition(); - LogReadResult logReadResult = localPartitionsAlreadyFetched.get(topicIdPartition); + LogReadResult logReadResult = remoteFetch.logReadResult(); shareFetchPartitionData.add( new ShareFetchPartitionData( topicIdPartition, @@ -833,9 +840,9 @@ private void completeRemoteStorageShareFetchRequest() { } // Update metric to record acquired to requested partitions. - double requestTopicToAcquired = (double) (partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size(); - if (requestTopicToAcquired > 0) - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + double acquiredRatio = (double) (partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size(); + if (acquiredRatio > 0) + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (acquiredRatio * 100)); Map remoteFetchResponse = ShareFetchUtils.processFetchResponse( shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler); @@ -870,6 +877,7 @@ private void cancelRemoteFetchTask() { public record RemoteFetch( TopicIdPartition topicIdPartition, + LogReadResult logReadResult, Future remoteFetchTask, CompletableFuture remoteFetchResult, RemoteStorageFetchInfo remoteFetchInfo, @@ -879,6 +887,7 @@ public record RemoteFetch( public String toString() { return "RemoteFetch(" + "topicIdPartition=" + topicIdPartition + + ", logReadResult=" + logReadResult + ", remoteFetchTask=" + remoteFetchTask + ", remoteFetchResult=" + remoteFetchResult + ", remoteFetchInfo=" + remoteFetchInfo + diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index a02925c1224b9..6c980da1be762 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -1228,7 +1228,8 @@ public void testRemoteStorageFetchTryCompleteReturnsFalse() { // Remote fetch object gets created for delayed share fetch object. assertNotNull(delayedShareFetch.remoteFetch()); // Verify the locks are released for local log read topic partitions tp0 and tp1. - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); assertTrue(delayedShareFetch.lock().tryLock()); delayedShareFetch.lock().unlock(); } From 2dc8ee0486b96647a04c710bd3859fdedce20830 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Tue, 15 Apr 2025 14:23:51 +0530 Subject: [PATCH 07/14] Addressed Jun's round 1 review comments - part 2 --- .../kafka/server/share/DelayedShareFetch.java | 121 ++++++++---------- .../server/share/DelayedShareFetchTest.java | 6 +- 2 files changed, 56 insertions(+), 71 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 4db206bb4f56b..13ac124b912b3 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -291,11 +291,11 @@ public boolean tryComplete() { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); - // Map to store the remote fetch metadata corresponding to the topic partitions for which we need to perform remote fetch. - LinkedHashMap remoteStorageFetchInfoMap = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); + // Store the remote fetch info and the topic partition for which we need to perform remote fetch. + Optional topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); - if (!remoteStorageFetchInfoMap.isEmpty()) { - return maybeProcessRemoteFetch(topicPartitionData, remoteStorageFetchInfoMap, replicaManagerReadResponse); + if (topicPartitionRemoteFetchInfoOpt.isPresent()) { + return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get(), replicaManagerReadResponse); } maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) { @@ -592,33 +592,40 @@ Meter expiredRequestMeter() { return expiredRequestMeter; } - private LinkedHashMap maybePrepareRemoteStorageFetchInfo( + private Optional maybePrepareRemoteStorageFetchInfo( LinkedHashMap topicPartitionData, LinkedHashMap replicaManagerReadResponse ) { - LinkedHashMap remoteStorageFetchMetadataMap = new LinkedHashMap<>(); - replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> { + Optional remoteStorageFetchMetadataMap = Optional.empty(); + for (Map.Entry entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { - remoteStorageFetchMetadataMap.put(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get()); + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get())); partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; } - }); + } return remoteStorageFetchMetadataMap; } private boolean maybeProcessRemoteFetch( LinkedHashMap topicPartitionData, - LinkedHashMap remoteStorageFetchInfoMap, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, LinkedHashMap replicaManagerReadResponse ) throws Exception { topicPartitionData.keySet().forEach(topicIdPartition -> { - // topic partitions for which fetching would be happening from local log and not remote storage. - if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. releasePartitionLocks(Set.of(topicIdPartition)); } }); - Optional exceptionOpt = processRemoteFetchOrException(remoteStorageFetchInfoMap, replicaManagerReadResponse); + Optional exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse); if (exceptionOpt.isPresent()) { remoteStorageFetchException = exceptionOpt; throw exceptionOpt.get(); @@ -629,25 +636,15 @@ private boolean maybeProcessRemoteFetch( /** * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. - * @param remoteStorageFetchInfoMap - The topic partition to remote storage fetch info map + * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions */ private Optional processRemoteFetchOrException( - LinkedHashMap remoteStorageFetchInfoMap, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, LinkedHashMap replicaManagerReadResponse ) { - // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for - // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, - // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform - // fetch for multiple remote fetch topic partition in a single share fetch request - TopicIdPartition remoteFetchTopicIdPartition = remoteFetchTopicIdPartition(remoteStorageFetchInfoMap); - RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition); - - LinkedHashMap fetchOffsetMetadataMap = new LinkedHashMap<>(); - remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put( - topicIdPartition, - replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata - )); + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.remoteStorageFetchInfo(); LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition); Future remoteFetchTask; @@ -667,28 +664,10 @@ private Optional processRemoteFetchOrException( } catch (Exception e) { return Optional.of(e); } - remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap)); + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); return Optional.empty(); } - /** - * This function returns the first topic partition for which we need to perform remote storage fetch. We remove all the - * other partitions that can have a remote storage fetch for further processing and release the fetch locks for them. - * @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information. - * @return the first topic partition for which we need to perform remote storage fetch - */ - private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap remoteStorageFetchInfoMap) { - Map.Entry firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next(); - TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey(); - remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> { - if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) { - partitionsAcquired.remove(topicIdPartition); - releasePartitionLocks(Set.of(topicIdPartition)); - } - }); - return remoteFetchTopicIdPartition; - } - /** * This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent(). * The operation can be completed if: @@ -701,25 +680,18 @@ private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap entry : remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) { - TopicIdPartition topicIdPartition = entry.getKey(); - LogOffsetMetadata fetchOffsetMetadata = entry.getValue(); - try { - if (fetchOffsetMetadata != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) { - replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); - } - } catch (KafkaStorageException e) { // Case a - log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); - canComplete = true; - } catch (UnknownTopicOrPartitionException e) { // Case b - log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); - canComplete = true; - } catch (NotLeaderOrFollowerException e) { // Case c - log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); - canComplete = true; - } - if (canComplete) - break; + TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition(); + try { + replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + } catch (KafkaStorageException e) { // Case a + log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (UnknownTopicOrPartitionException e) { // Case b + log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (NotLeaderOrFollowerException e) { // Case c + log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; } if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d @@ -813,7 +785,7 @@ private void completeRemoteStorageShareFetchRequest() { // Get the local log read based topic partitions. LinkedHashMap nonRemoteFetchSharePartitions = new LinkedHashMap<>(); sharePartitions.forEach((topicIdPartition, sharePartition) -> { - if (!partitionsAcquired.containsKey(topicIdPartition) && !remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) { + if (!partitionsAcquired.containsKey(topicIdPartition)) { nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition); } }); @@ -880,8 +852,7 @@ public record RemoteFetch( LogReadResult logReadResult, Future remoteFetchTask, CompletableFuture remoteFetchResult, - RemoteStorageFetchInfo remoteFetchInfo, - LinkedHashMap fetchOffsetMetadataMap + RemoteStorageFetchInfo remoteFetchInfo ) { @Override public String toString() { @@ -891,7 +862,19 @@ public String toString() { ", remoteFetchTask=" + remoteFetchTask + ", remoteFetchResult=" + remoteFetchResult + ", remoteFetchInfo=" + remoteFetchInfo + - ", fetchOffsetMetadataMap=" + fetchOffsetMetadataMap + + ")"; + } + } + + public record TopicPartitionRemoteFetchInfo( + TopicIdPartition topicIdPartition, + RemoteStorageFetchInfo remoteStorageFetchInfo + ) { + @Override + public String toString() { + return "TopicPartitionRemoteFetchInfo(" + + "topicIdPartition=" + topicIdPartition + + ", remoteStorageFetchInfo=" + remoteStorageFetchInfo + ")"; } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 6c980da1be762..26c8b76b2c922 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -1688,9 +1688,11 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() { assertTrue(delayedShareFetch.isCompleted()); // Pending remote fetch object gets created for delayed share fetch. assertNotNull(delayedShareFetch.remoteFetch()); - // Verify the locks are released separately for tp0 (from onComplete) and tp1 (from tryComplete). - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + // Verify the locks are released separately for tp1 (from tryComplete). Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + // From onComplete, the locks will be released for both tp0 and tp1. tp0 because it was acquired from + // tryComplete and has remote fetch processed. tp1 will be reacquired in onComplete when we check for local log read. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); assertTrue(shareFetch.isCompleted()); // Share fetch response only contains the first remote storage fetch topic partition - tp0. assertEquals(Set.of(tp0), future.join().keySet()); From ccf3e06ec185c596eabd0f500f8b24797a183dfe Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Wed, 16 Apr 2025 11:41:26 +0530 Subject: [PATCH 08/14] Addressed Jun's round 2 review comments --- .../kafka/server/share/DelayedShareFetch.java | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 13ac124b912b3..b897e9fcdc81f 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -60,6 +60,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -295,7 +296,7 @@ public boolean tryComplete() { Optional topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); if (topicPartitionRemoteFetchInfoOpt.isPresent()) { - return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get(), replicaManagerReadResponse); + return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get()); } maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) { @@ -596,7 +597,7 @@ private Optional maybePrepareRemoteStorageFetchIn LinkedHashMap topicPartitionData, LinkedHashMap replicaManagerReadResponse ) { - Optional remoteStorageFetchMetadataMap = Optional.empty(); + Optional topicPartitionRemoteFetchInfoOpt = Optional.empty(); for (Map.Entry entry : replicaManagerReadResponse.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); LogReadResult logReadResult = entry.getValue(); @@ -605,27 +606,26 @@ private Optional maybePrepareRemoteStorageFetchIn // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform // fetch for multiple remote fetch topic partition in a single share fetch request - remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get())); + topicPartitionRemoteFetchInfoOpt = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult)); partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); break; } } - return remoteStorageFetchMetadataMap; + return topicPartitionRemoteFetchInfoOpt; } private boolean maybeProcessRemoteFetch( LinkedHashMap topicPartitionData, - TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, - LinkedHashMap replicaManagerReadResponse + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ) throws Exception { topicPartitionData.keySet().forEach(topicIdPartition -> { // topic partitions for which fetch would not be happening in this share fetch request. if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. - releasePartitionLocks(Set.of(topicIdPartition)); + releasePartitionLocksAndAddToActionQueue(Set.of(topicIdPartition)); } }); - Optional exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse); + Optional exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo); if (exceptionOpt.isPresent()) { remoteStorageFetchException = exceptionOpt; throw exceptionOpt.get(); @@ -637,15 +637,12 @@ private boolean maybeProcessRemoteFetch( /** * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. - * @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions */ private Optional processRemoteFetchOrException( - TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo, - LinkedHashMap replicaManagerReadResponse + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ) { TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); - RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.remoteStorageFetchInfo(); - LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get(); Future remoteFetchTask; CompletableFuture remoteFetchResult = new CompletableFuture<>(); @@ -664,7 +661,7 @@ private Optional processRemoteFetchOrException( } catch (Exception e) { return Optional.of(e); } - remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); return Optional.empty(); } @@ -733,10 +730,10 @@ private void releasePartitionLocksAndAddToActionQueue(Set topi /** * This function completes a share fetch request for which we have identified remoteFetch during tryComplete() - * Note - This function should only be called when we know that there is remote fetch in-flight/completed. + * Note - This function should only be called when we know that there is remote fetch in-flight/completed/expired. */ private void completeRemoteStorageShareFetchRequest() { - LinkedHashMap nonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); + LinkedHashMap acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); try { List shareFetchPartitionData = new ArrayList<>(); int readableBytes = 0; @@ -789,20 +786,20 @@ private void completeRemoteStorageShareFetchRequest() { nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition); } }); - nonRemoteFetchTopicPartitionData = acquirablePartitions(nonRemoteFetchSharePartitions); - if (!nonRemoteFetchTopicPartitionData.isEmpty()) { + acquiredNonRemoteFetchTopicPartitionData = acquirablePartitions(nonRemoteFetchSharePartitions); + if (!acquiredNonRemoteFetchTopicPartitionData.isEmpty()) { log.trace("Fetchable local share partitions for a remote share fetch request data: {} with groupId: {} fetch params: {}", - nonRemoteFetchTopicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + acquiredNonRemoteFetchTopicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); LinkedHashMap responseData = readFromLog( - nonRemoteFetchTopicPartitionData, - partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - readableBytes, nonRemoteFetchTopicPartitionData.keySet(), nonRemoteFetchTopicPartitionData.size())); + acquiredNonRemoteFetchTopicPartitionData, + partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - readableBytes, acquiredNonRemoteFetchTopicPartitionData.keySet(), acquiredNonRemoteFetchTopicPartitionData.size())); for (Map.Entry entry : responseData.entrySet()) { if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) { shareFetchPartitionData.add( new ShareFetchPartitionData( entry.getKey(), - nonRemoteFetchTopicPartitionData.get(entry.getKey()), + acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey()), entry.getValue().toFetchPartitionData(false) ) ); @@ -812,7 +809,7 @@ private void completeRemoteStorageShareFetchRequest() { } // Update metric to record acquired to requested partitions. - double acquiredRatio = (double) (partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size(); + double acquiredRatio = (double) (partitionsAcquired.size() + acquiredNonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size(); if (acquiredRatio > 0) shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (acquiredRatio * 100)); @@ -820,24 +817,32 @@ private void completeRemoteStorageShareFetchRequest() { shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler); shareFetch.maybeComplete(remoteFetchResponse); log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse); - } catch (RuntimeException e) { - throw e; + } catch (InterruptedException | ExecutionException e) { + log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e); + handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e); } catch (Exception e) { - log.error("Error processing delayed share fetch request", e); - Set topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); - topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet()); - handleFetchException(shareFetch, topicIdPartitions, e); + log.error("Unexpected error in processing delayed share fetch request", e); + handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e); } finally { Set topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); - topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet()); + topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet()); releasePartitionLocksAndAddToActionQueue(topicIdPartitions); } } + private void handleExceptionInCompletingRemoteStorageShareFetchRequest( + Set acquiredNonRemoteFetchTopicPartitions, + Exception e + ) { + Set topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); + topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitions); + handleFetchException(shareFetch, topicIdPartitions, e); + } + /** * Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is * already running as it may force closing opened/cached resources as transaction index. - * Note - This function should only be called when we know that there is a remote fetch in-flight/completed. + * Note - This function should only be called when we know that there is a remote fetch in-flight/expired. */ private void cancelRemoteFetchTask() { boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false); @@ -868,13 +873,13 @@ public String toString() { public record TopicPartitionRemoteFetchInfo( TopicIdPartition topicIdPartition, - RemoteStorageFetchInfo remoteStorageFetchInfo + LogReadResult logReadResult ) { @Override public String toString() { return "TopicPartitionRemoteFetchInfo(" + "topicIdPartition=" + topicIdPartition + - ", remoteStorageFetchInfo=" + remoteStorageFetchInfo + + ", logReadResult=" + logReadResult + ")"; } } From dfc0366e88b02c3db58fb59ce42624f3301d67a3 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Thu, 17 Apr 2025 10:57:45 +0530 Subject: [PATCH 09/14] Addressed Jun's round 3 review comments --- .../kafka/server/share/DelayedShareFetch.java | 14 ++++++++----- .../server/share/DelayedShareFetchTest.java | 20 ++++--------------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index b897e9fcdc81f..2c84972db4175 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -618,16 +618,18 @@ private boolean maybeProcessRemoteFetch( LinkedHashMap topicPartitionData, TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ) throws Exception { + Set nonRemoteFetchTopicPartitions = new LinkedHashSet<>(); topicPartitionData.keySet().forEach(topicIdPartition -> { // topic partitions for which fetch would not be happening in this share fetch request. if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { - // Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch. - releasePartitionLocksAndAddToActionQueue(Set.of(topicIdPartition)); + nonRemoteFetchTopicPartitions.add(topicIdPartition); } }); + // Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add + // them to the delayed actions queue. + releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions); Optional exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo); if (exceptionOpt.isPresent()) { - remoteStorageFetchException = exceptionOpt; throw exceptionOpt.get(); } // Check if remote fetch can be completed. @@ -657,8 +659,10 @@ private Optional processRemoteFetchOrException( } catch (RejectedExecutionException e) { // Return the error if any in scheduling the remote fetch task. log.warn("Unable to fetch data from remote storage", e); + remoteStorageFetchException = Optional.of(e); return Optional.of(e); } catch (Exception e) { + remoteStorageFetchException = Optional.of(e); return Optional.of(e); } remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); @@ -730,7 +734,7 @@ private void releasePartitionLocksAndAddToActionQueue(Set topi /** * This function completes a share fetch request for which we have identified remoteFetch during tryComplete() - * Note - This function should only be called when we know that there is remote fetch in-flight/completed/expired. + * Note - This function should only be called when we know that there is remote fetch. */ private void completeRemoteStorageShareFetchRequest() { LinkedHashMap acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); @@ -842,7 +846,7 @@ private void handleExceptionInCompletingRemoteStorageShareFetchRequest( /** * Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is * already running as it may force closing opened/cached resources as transaction index. - * Note - This function should only be called when we know that there is a remote fetch in-flight/expired. + * Note - This function should only be called when we know that there is remote fetch. */ private void cancelRemoteFetchTask() { boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false); diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 26c8b76b2c922..bd38910d5025a 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -1228,8 +1228,7 @@ public void testRemoteStorageFetchTryCompleteReturnsFalse() { // Remote fetch object gets created for delayed share fetch object. assertNotNull(delayedShareFetch.remoteFetch()); // Verify the locks are released for local log read topic partitions tp0 and tp1. - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); assertTrue(delayedShareFetch.lock().tryLock()); delayedShareFetch.lock().unlock(); } @@ -1459,24 +1458,19 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { ReplicaManager replicaManager = mock(ReplicaManager.class); TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); SharePartition sp0 = mock(SharePartition.class); - SharePartition sp1 = mock(SharePartition.class); // sp0 is acquirable, sp1 is not acquirable. when(sp0.maybeAcquireFetchLock()).thenReturn(true); - when(sp1.maybeAcquireFetchLock()).thenReturn(false); when(sp0.canAcquireRecords()).thenReturn(true); - when(sp1.canAcquireRecords()).thenReturn(false); LinkedHashMap sharePartitions = new LinkedHashMap<>(); sharePartitions.put(tp0, sp0); - sharePartitions.put(tp1, sp1); CompletableFuture> future = new CompletableFuture<>(); ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), - future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS); when(sp0.nextFetchOffset()).thenReturn(10L); @@ -1504,7 +1498,7 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfull .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) - .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0))) .build()); when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( @@ -1531,18 +1525,15 @@ public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); - TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3)); SharePartition sp0 = mock(SharePartition.class); SharePartition sp1 = mock(SharePartition.class); SharePartition sp2 = mock(SharePartition.class); - SharePartition sp3 = mock(SharePartition.class); // Except tp3, all the topic partitions are acquirable. when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true); when(sp2.maybeAcquireFetchLock()).thenReturn(true); - when(sp3.maybeAcquireFetchLock()).thenReturn(false); when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(true); when(sp2.canAcquireRecords()).thenReturn(true); @@ -1551,11 +1542,10 @@ public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); sharePartitions.put(tp2, sp2); - sharePartitions.put(tp3, sp3); CompletableFuture> future = new CompletableFuture<>(); ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), - future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS, + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS); when(sp0.nextFetchOffset()).thenReturn(10L); @@ -1679,8 +1669,6 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() { when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); assertFalse(delayedShareFetch.isCompleted()); assertTrue(delayedShareFetch.tryComplete()); From 5b4e20397eb6c6faab40c851cea3cf90eec31a11 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Thu, 17 Apr 2025 15:21:58 +0530 Subject: [PATCH 10/14] Minor unit test code comments improvement --- .../test/java/kafka/server/share/DelayedShareFetchTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index bd38910d5025a..d38faf1ba27b1 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -1461,7 +1461,7 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfull SharePartition sp0 = mock(SharePartition.class); - // sp0 is acquirable, sp1 is not acquirable. + // sp0 is acquirable. when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(true); @@ -1530,7 +1530,7 @@ public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { SharePartition sp1 = mock(SharePartition.class); SharePartition sp2 = mock(SharePartition.class); - // Except tp3, all the topic partitions are acquirable. + // All the topic partitions are acquirable. when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true); when(sp2.maybeAcquireFetchLock()).thenReturn(true); From 2b074d9614a9db6000cdaa44d5356f1209ebe82c Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Thu, 17 Apr 2025 15:34:56 +0530 Subject: [PATCH 11/14] Trigger build From 7c46f4c362ad12e4a4a0ebc36dea3526cb97802a Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Thu, 17 Apr 2025 15:53:20 +0530 Subject: [PATCH 12/14] Merged changes from trunk to fix pipeline failures --- .../src/test/java/kafka/server/share/DelayedShareFetchTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index d38faf1ba27b1..43ece70ca0ee6 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -17,7 +17,6 @@ package kafka.server.share; import kafka.cluster.Partition; -import kafka.log.remote.RemoteLogManager; import kafka.server.LogReadResult; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -34,6 +33,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.share.SharePartitionKey; From f94f7ca216470a40bf80386b4441692e08456623 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Fri, 18 Apr 2025 01:08:36 +0530 Subject: [PATCH 13/14] Addressed Jun's round 3 review comments - part 2 --- .../java/kafka/server/share/DelayedShareFetch.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 2c84972db4175..f85453b1e0b4c 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -331,8 +331,17 @@ public boolean tryComplete() { releasePartitionLocks(topicPartitionData.keySet()); partitionsAcquired.clear(); localPartitionsAlreadyFetched.clear(); + return forceComplete(); + } else { + boolean completedByMe = forceComplete(); + // If invocation of forceComplete is not successful, then that means the request is already completed + // hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that + // completes the operation (due to expiration) right before a different thread is about to enter tryComplete. + if (!completedByMe) { + releasePartitionLocks(partitionsAcquired.keySet()); + } + return completedByMe; } - return forceComplete(); } } From 25788b32f2c983184db08aa82dc8a9c450d04dcb Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Sat, 19 Apr 2025 15:05:51 +0530 Subject: [PATCH 14/14] Addressed Jun's round 4 review comments --- .../kafka/server/share/DelayedShareFetch.java | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index f85453b1e0b4c..9bcaa48587f78 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -62,7 +62,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; @@ -304,7 +303,7 @@ public boolean tryComplete() { localPartitionsAlreadyFetched = replicaManagerReadResponse; boolean completedByMe = forceComplete(); // If invocation of forceComplete is not successful, then that means the request is already completed - // hence release the acquired locks. + // hence the acquired locks are already released. if (!completedByMe) { releasePartitionLocks(partitionsAcquired.keySet()); } @@ -335,7 +334,7 @@ public boolean tryComplete() { } else { boolean completedByMe = forceComplete(); // If invocation of forceComplete is not successful, then that means the request is already completed - // hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that + // hence the acquired locks are already released. This can occur in case of remote storage fetch if there is a thread that // completes the operation (due to expiration) right before a different thread is about to enter tryComplete. if (!completedByMe) { releasePartitionLocks(partitionsAcquired.keySet()); @@ -626,7 +625,7 @@ private Optional maybePrepareRemoteStorageFetchIn private boolean maybeProcessRemoteFetch( LinkedHashMap topicPartitionData, TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo - ) throws Exception { + ) { Set nonRemoteFetchTopicPartitions = new LinkedHashSet<>(); topicPartitionData.keySet().forEach(topicIdPartition -> { // topic partitions for which fetch would not be happening in this share fetch request. @@ -637,19 +636,16 @@ private boolean maybeProcessRemoteFetch( // Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add // them to the delayed actions queue. releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions); - Optional exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo); - if (exceptionOpt.isPresent()) { - throw exceptionOpt.get(); - } + processRemoteFetchOrException(topicPartitionRemoteFetchInfo); // Check if remote fetch can be completed. return maybeCompletePendingRemoteFetch(); } /** - * Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional. - * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information. + * Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates remoteFetchOpt. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch information. */ - private Optional processRemoteFetchOrException( + private void processRemoteFetchOrException( TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo ) { TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); @@ -665,17 +661,12 @@ private Optional processRemoteFetchOrException( replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); } ); - } catch (RejectedExecutionException e) { - // Return the error if any in scheduling the remote fetch task. - log.warn("Unable to fetch data from remote storage", e); - remoteStorageFetchException = Optional.of(e); - return Optional.of(e); } catch (Exception e) { + // Throw the error if any in scheduling the remote fetch task. remoteStorageFetchException = Optional.of(e); - return Optional.of(e); + throw e; } remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); - return Optional.empty(); } /**