Skip to content

Commit e1ff387

Browse files
authored
KAFKA-14915: Allow reading from remote storage for multiple partitions in one fetchRequest (apache#20045)
This PR enables reading remote storage for multiple partitions in one fetchRequest. The main changes are: 1. In `DelayedRemoteFetch`, we accept multiple remoteFetchTasks and other metadata now. 2. In `DelayedRemoteFetch`, we'll wait until all remoteFetch done, either succeeded or failed. 3. In `ReplicaManager#fetchMessage`, we'll create one `DelayedRemoteFetch` and pass multiple remoteFetch metadata to it, and watch all of them. 4. Added tests Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, Satish Duggana <satishd@apache.org>
1 parent 29cf97b commit e1ff387

13 files changed

Lines changed: 496 additions & 130 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,7 @@ public class ConsumerConfig extends AbstractConfig {
197197
"this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " +
198198
"The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
199199
"<code>max.message.bytes</code> (topic config). A fetch request consists of many partitions, and there is another setting that controls how much " +
200-
"data is returned for each partition in a fetch request - see <code>max.partition.fetch.bytes</code>. Note that there is a current limitation when " +
201-
"performing remote reads from tiered storage (KIP-405) - only one partition out of the fetch request is fetched from the remote store (KAFKA-14915). " +
202-
"Note also that the consumer performs multiple fetches in parallel.";
200+
"data is returned for each partition in a fetch request - see <code>max.partition.fetch.bytes</code>. Note that the consumer performs multiple fetches in parallel.";
203201
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
204202

205203
/**
@@ -224,9 +222,7 @@ public class ConsumerConfig extends AbstractConfig {
224222
"partition of the fetch is larger than this limit, the " +
225223
"batch will still be returned to ensure that the consumer can make progress. The maximum record batch size " +
226224
"accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
227-
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size. " +
228-
"Consider increasing <code>max.partition.fetch.bytes</code> especially in the cases of remote storage reads (KIP-405), because currently only " +
229-
"one partition per fetch request is served from the remote store (KAFKA-14915).";
225+
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size.";
230226
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;
231227

232228
/** <code>send.buffer.bytes</code> */

core/src/main/scala/kafka/server/DelayedRemoteFetch.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation
2828
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
2929
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo}
3030

31+
import java.util
3132
import java.util.concurrent.{CompletableFuture, Future, TimeUnit}
3233
import java.util.{Optional, OptionalInt, OptionalLong}
3334
import scala.collection._
@@ -36,9 +37,9 @@ import scala.collection._
3637
* A remote fetch operation that can be created by the replica manager and watched
3738
* in the remote fetch operation purgatory
3839
*/
39-
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
40-
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
41-
remoteFetchInfo: RemoteStorageFetchInfo,
40+
class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition, Future[Void]],
41+
remoteFetchResults: util.Map[TopicIdPartition, CompletableFuture[RemoteLogReadResult]],
42+
remoteFetchInfos: util.Map[TopicIdPartition, RemoteStorageFetchInfo],
4243
remoteFetchMaxWaitMs: Long,
4344
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
4445
fetchParams: FetchParams,
@@ -56,7 +57,7 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
5657
*
5758
* Case a: This broker is no longer the leader of the partition it tries to fetch
5859
* Case b: This broker does not know the partition it tries to fetch
59-
* Case c: The remote storage read request completed (succeeded or failed)
60+
* Case c: All the remote storage read request completed (succeeded or failed)
6061
* Case d: The partition is in an offline log directory on this broker
6162
*
6263
* Upon completion, should return whatever data is available for each valid partition
@@ -81,7 +82,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
8182
return forceComplete()
8283
}
8384
}
84-
if (remoteFetchResult.isDone) // Case c
85+
// Case c
86+
if (remoteFetchResults.values().stream().allMatch(taskResult => taskResult.isDone))
8587
forceComplete()
8688
else
8789
false
@@ -90,8 +92,13 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
9092
override def onExpiration(): Unit = {
9193
// cancel the remote storage read task, if it has not been executed yet and
9294
// avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index.
93-
val cancelled = remoteFetchTask.cancel(false)
94-
if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}")
95+
remoteFetchTasks.forEach { (topicIdPartition, task) =>
96+
if (task != null && !task.isDone) {
97+
if (!task.cancel(false)) {
98+
debug(s"Remote fetch task for remoteFetchInfo: ${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.")
99+
}
100+
}
101+
}
95102

96103
DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
97104
}
@@ -101,7 +108,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
101108
*/
102109
override def onComplete(): Unit = {
103110
val fetchPartitionData = localReadResults.map { case (tp, result) =>
104-
if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
111+
val remoteFetchResult = remoteFetchResults.get(tp)
112+
if (remoteFetchInfos.containsKey(tp)
105113
&& remoteFetchResult.isDone
106114
&& result.error == Errors.NONE
107115
&& result.info.delayedRemoteStorageFetch.isPresent) {

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1579,15 +1579,18 @@ class ReplicaManager(val config: KafkaConfig,
15791579
}
15801580

15811581
/**
1582-
* Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
1583-
* else returns [[None]].
1582+
* Initiates an asynchronous remote storage fetch operation for the given remote fetch information.
1583+
*
1584+
* This method schedules a remote fetch task with the remote log manager and sets up the necessary
1585+
* completion handling for the operation. The remote fetch result will be used to populate the
1586+
* delayed remote fetch purgatory when completed.
1587+
*
1588+
* @param remoteFetchInfo The remote storage fetch information
1589+
*
1590+
* @return A tuple containing the remote fetch task and the remote fetch result
15841591
*/
1585-
private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
1586-
params: FetchParams,
1587-
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
1588-
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
1589-
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[LogReadResult] = {
1590-
val key = new TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), remoteFetchInfo.topicPartition.partition())
1592+
private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo): (Future[Void], CompletableFuture[RemoteLogReadResult]) = {
1593+
val key = new TopicPartitionOperationKey(remoteFetchInfo.topicIdPartition)
15911594
val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
15921595
var remoteFetchTask: Future[Void] = null
15931596
try {
@@ -1597,31 +1600,39 @@ class ReplicaManager(val config: KafkaConfig,
15971600
})
15981601
} catch {
15991602
case e: RejectedExecutionException =>
1600-
// Return the error if any in scheduling the remote fetch task
1601-
warn("Unable to fetch data from remote storage", e)
1602-
return Some(createLogReadResult(e))
1603+
warn(s"Unable to fetch data from remote storage for remoteFetchInfo: $remoteFetchInfo", e)
1604+
// Store the error in RemoteLogReadResult if any in scheduling the remote fetch task.
1605+
// It will be sent back to the client in DelayedRemoteFetch along with other successful remote fetch results.
1606+
remoteFetchResult.complete(new RemoteLogReadResult(Optional.empty, Optional.of(e)))
16031607
}
16041608

1609+
(remoteFetchTask, remoteFetchResult)
1610+
}
1611+
1612+
/**
1613+
* Process all remote fetches by creating async read tasks and handling them in DelayedRemoteFetch collectively.
1614+
*/
1615+
private def processRemoteFetches(remoteFetchInfos: util.HashMap[TopicIdPartition, RemoteStorageFetchInfo],
1616+
params: FetchParams,
1617+
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
1618+
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
1619+
remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
1620+
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
1621+
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
1622+
1623+
remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
1624+
val (task, result) = processRemoteFetch(remoteFetchInfo)
1625+
remoteFetchTasks.put(topicIdPartition, task)
1626+
remoteFetchResults.put(topicIdPartition, result)
1627+
}
1628+
16051629
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
1606-
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
1607-
fetchPartitionStatus, params, logReadResults, this, responseCallback)
1608-
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, util.Collections.singletonList(key))
1609-
None
1610-
}
1611-
1612-
private def buildPartitionToFetchPartitionData(logReadResults: Seq[(TopicIdPartition, LogReadResult)],
1613-
remoteFetchTopicPartition: TopicPartition,
1614-
error: LogReadResult): Seq[(TopicIdPartition, FetchPartitionData)] = {
1615-
logReadResults.map { case (tp, result) =>
1616-
val fetchPartitionData = {
1617-
if (tp.topicPartition().equals(remoteFetchTopicPartition))
1618-
error
1619-
else
1620-
result
1621-
}.toFetchPartitionData(false)
1630+
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
1631+
remoteFetchPartitionStatus, params, logReadResults, this, responseCallback)
16221632

1623-
tp -> fetchPartitionData
1624-
}
1633+
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
1634+
val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
1635+
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, delayedFetchKeys.asJava)
16251636
}
16261637

16271638
/**
@@ -1639,8 +1650,8 @@ class ReplicaManager(val config: KafkaConfig,
16391650
var bytesReadable: Long = 0
16401651
var errorReadingData = false
16411652

1642-
// The 1st topic-partition that has to be read from remote storage
1643-
var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
1653+
// topic-partitions that have to be read from remote storage
1654+
val remoteFetchInfos = new util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]()
16441655

16451656
var hasDivergingEpoch = false
16461657
var hasPreferredReadReplica = false
@@ -1651,8 +1662,8 @@ class ReplicaManager(val config: KafkaConfig,
16511662
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
16521663
if (logReadResult.error != Errors.NONE)
16531664
errorReadingData = true
1654-
if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) {
1655-
remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
1665+
if (logReadResult.info.delayedRemoteStorageFetch.isPresent) {
1666+
remoteFetchInfos.put(topicIdPartition, logReadResult.info.delayedRemoteStorageFetch.get())
16561667
}
16571668
if (logReadResult.divergingEpoch.isPresent)
16581669
hasDivergingEpoch = true
@@ -1669,7 +1680,7 @@ class ReplicaManager(val config: KafkaConfig,
16691680
// 4) some error happens while reading data
16701681
// 5) we found a diverging epoch
16711682
// 6) has a preferred read replica
1672-
if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
1683+
if (remoteFetchInfos.isEmpty && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
16731684
hasDivergingEpoch || hasPreferredReadReplica)) {
16741685
val fetchPartitionData = logReadResults.map { case (tp, result) =>
16751686
val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)
@@ -1686,15 +1697,8 @@ class ReplicaManager(val config: KafkaConfig,
16861697
})
16871698
}
16881699

1689-
if (remoteFetchInfo.isPresent) {
1690-
val maybeLogReadResultWithError = processRemoteFetch(remoteFetchInfo.get(), params, responseCallback, logReadResults, fetchPartitionStatus)
1691-
if (maybeLogReadResultWithError.isDefined) {
1692-
// If there is an error in scheduling the remote fetch task, return what we currently have
1693-
// (the data read from local log segment for the other topic-partitions) and an error for the topic-partition
1694-
// that we couldn't read from remote storage
1695-
val partitionToFetchPartitionData = buildPartitionToFetchPartitionData(logReadResults, remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)
1696-
responseCallback(partitionToFetchPartitionData)
1697-
}
1700+
if (!remoteFetchInfos.isEmpty) {
1701+
processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResults, fetchPartitionStatus.toSeq)
16981702
} else {
16991703
// If there is not enough data to respond and there is no remote data, we will let the fetch request
17001704
// wait for new data.
@@ -1902,9 +1906,9 @@ class ReplicaManager(val config: KafkaConfig,
19021906
)
19031907
} else {
19041908
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
1905-
// For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
1909+
// For the topic-partitions that need remote data, we will use this information to read the data in another thread.
19061910
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
1907-
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
1911+
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp,
19081912
fetchInfo, params.isolation)))
19091913
}
19101914

0 commit comments

Comments
 (0)