Skip to content

Commit 759fbbb

Browse files
authored
KAFKA-14484: Move UnifiedLog to storage module (#19030)
Rewrite UnifiedLog in Java Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent ca834d1 commit 759fbbb

File tree

83 files changed

+3811
-3583
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+3811
-3583
lines changed

Diff for: checkstyle/suppressions.xml

+5-3
Original file line numberDiff line numberDiff line change
@@ -355,11 +355,13 @@
355355

356356
<!-- storage -->
357357
<suppress checks="CyclomaticComplexity"
358-
files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
358+
files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager|UnifiedLog).java"/>
359359
<suppress checks="NPathComplexity"
360-
files="(LocalLog|LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
360+
files="(LocalLog|LogLoader|LogValidator|RemoteLogManager|RemoteIndexCache|UnifiedLog).java"/>
361361
<suppress checks="ParameterNumber"
362-
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig).java"/>
362+
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig|UnifiedLog).java"/>
363+
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
364+
files="(UnifiedLog).java"/>
363365

364366
<!-- benchmarks -->
365367
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"

Diff for: clients/src/main/java/org/apache/kafka/common/utils/LogContext.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,14 @@ public LogContext() {
4343
}
4444

4545
public Logger logger(Class<?> clazz) {
46-
Logger logger = LoggerFactory.getLogger(clazz);
46+
return logger(LoggerFactory.getLogger(clazz));
47+
}
48+
49+
public Logger logger(String clazz) {
50+
return logger(LoggerFactory.getLogger(clazz));
51+
}
52+
53+
private Logger logger(Logger logger) {
4754
if (logger instanceof LocationAwareLogger) {
4855
return new LocationAwareKafkaLogger(logPrefix, (LocationAwareLogger) logger);
4956
} else {

Diff for: core/src/main/java/kafka/log/remote/RemoteLogManager.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package kafka.log.remote;
1818

1919
import kafka.cluster.Partition;
20-
import kafka.log.UnifiedLog;
2120
import kafka.server.DelayedRemoteListOffsets;
2221

2322
import org.apache.kafka.common.Endpoint;
@@ -88,6 +87,7 @@
8887
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
8988
import org.apache.kafka.storage.internals.log.TransactionIndex;
9089
import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
90+
import org.apache.kafka.storage.internals.log.UnifiedLog;
9191
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
9292

9393
import com.yammer.metrics.core.Timer;
@@ -138,12 +138,9 @@
138138
import java.util.function.Consumer;
139139
import java.util.function.Function;
140140
import java.util.function.Predicate;
141-
import java.util.function.Supplier;
142141
import java.util.stream.Collectors;
143142
import java.util.stream.Stream;
144143

145-
import scala.jdk.javaapi.CollectionConverters;
146-
147144
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
148145
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
149146
import static org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS;
@@ -668,7 +665,7 @@ public AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> asyncO
668665
long timestamp,
669666
long startingOffset,
670667
LeaderEpochFileCache leaderEpochCache,
671-
Supplier<Optional<FileRecords.TimestampAndOffset>> searchLocalLog) {
668+
TimestampAndOffsetSupplier searchLocalLog) {
672669
CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture = new CompletableFuture<>();
673670
Future<Void> jobFuture = remoteStorageReaderThreadPool.submit(
674671
new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> {
@@ -733,7 +730,7 @@ public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicParti
733730
&& isRemoteSegmentWithinLeaderEpochs(rlsMetadata, unifiedLog.logEndOffset(), epochWithOffsets)
734731
&& rlsMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
735732
// cache to avoid race conditions
736-
List<LogSegment> segmentsCopy = new ArrayList<>(unifiedLog.logSegments());
733+
List<LogSegment> segmentsCopy = unifiedLog.logSegments();
737734
if (segmentsCopy.isEmpty() || rlsMetadata.startOffset() < segmentsCopy.get(0).baseOffset()) {
738735
// search in remote-log
739736
return lookupTimestamp(rlsMetadata, timestamp, startingOffset);
@@ -904,7 +901,7 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti
904901
*/
905902
List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) {
906903
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
907-
List<LogSegment> segments = CollectionConverters.asJava(log.logSegments(fromOffset, Long.MAX_VALUE).toSeq());
904+
List<LogSegment> segments = log.logSegments(fromOffset, Long.MAX_VALUE);
908905
if (!segments.isEmpty()) {
909906
for (int idx = 1; idx < segments.size(); idx++) {
910907
LogSegment previousSeg = segments.get(idx - 1);
@@ -1776,7 +1773,7 @@ private FetchDataInfo addAbortedTransactions(long startOffset,
17761773

17771774
Consumer<List<AbortedTxn>> accumulator =
17781775
abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
1779-
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
1776+
.map(AbortedTxn::asAbortedTransaction).toList());
17801777

17811778
long startTimeNs = time.nanoseconds();
17821779
collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log);

Diff for: core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.TopicPartition;
2020
import org.apache.kafka.common.record.FileRecords;
2121
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
22+
import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
2223
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
2324

2425
import org.slf4j.Logger;
@@ -27,7 +28,6 @@
2728
import java.util.Optional;
2829
import java.util.concurrent.Callable;
2930
import java.util.function.Consumer;
30-
import java.util.function.Supplier;
3131

3232
public class RemoteLogOffsetReader implements Callable<Void> {
3333
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class);
@@ -36,15 +36,15 @@ public class RemoteLogOffsetReader implements Callable<Void> {
3636
private final long timestamp;
3737
private final long startingOffset;
3838
private final LeaderEpochFileCache leaderEpochCache;
39-
private final Supplier<Optional<FileRecords.TimestampAndOffset>> searchInLocalLog;
39+
private final AsyncOffsetReader.TimestampAndOffsetSupplier searchInLocalLog;
4040
private final Consumer<OffsetResultHolder.FileRecordsOrError> callback;
4141

4242
public RemoteLogOffsetReader(RemoteLogManager rlm,
4343
TopicPartition tp,
4444
long timestamp,
4545
long startingOffset,
4646
LeaderEpochFileCache leaderEpochCache,
47-
Supplier<Optional<FileRecords.TimestampAndOffset>> searchInLocalLog,
47+
AsyncOffsetReader.TimestampAndOffsetSupplier searchInLocalLog,
4848
Consumer<OffsetResultHolder.FileRecordsOrError> callback) {
4949
this.rlm = rlm;
5050
this.tp = tp;
@@ -60,8 +60,11 @@ public Void call() throws Exception {
6060
OffsetResultHolder.FileRecordsOrError result;
6161
try {
6262
// If it is not found in remote storage, then search in the local storage starting with local log start offset.
63-
Optional<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
64-
rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache).or(searchInLocalLog);
63+
Optional<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
64+
rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache);
65+
if (timestampAndOffsetOpt.isEmpty()) {
66+
timestampAndOffsetOpt = searchInLocalLog.get();
67+
}
6568
result = new OffsetResultHolder.FileRecordsOrError(Optional.empty(), timestampAndOffsetOpt);
6669
} catch (Exception e) {
6770
// NOTE: All the exceptions from the secondary storage are caught instead of only the KafkaException.

Diff for: core/src/main/java/kafka/server/TierStateMachine.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package kafka.server;
1919

2020
import kafka.cluster.Partition;
21-
import kafka.log.UnifiedLog;
2221
import kafka.log.remote.RemoteLogManager;
2322

2423
import org.apache.kafka.common.KafkaException;
@@ -36,6 +35,7 @@
3635
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
3736
import org.apache.kafka.storage.internals.log.EpochEntry;
3837
import org.apache.kafka.storage.internals.log.LogFileUtils;
38+
import org.apache.kafka.storage.internals.log.UnifiedLog;
3939

4040
import org.slf4j.Logger;
4141
import org.slf4j.LoggerFactory;
@@ -53,6 +53,7 @@
5353
import java.util.HashMap;
5454
import java.util.List;
5555
import java.util.Map;
56+
import java.util.Optional;
5657

5758
import scala.Option;
5859
import scala.jdk.javaapi.CollectionConverters;
@@ -186,7 +187,7 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
186187
Long leaderLogStartOffset,
187188
UnifiedLog unifiedLog) throws IOException, RemoteStorageException {
188189

189-
if (!unifiedLog.remoteStorageSystemEnable() || !unifiedLog.config().remoteStorageEnable()) {
190+
if (!unifiedLog.remoteLogEnabled()) {
190191
// If the tiered storage is not enabled throw an exception back so that it will retry until the tiered storage
191192
// is set as expected.
192193
throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled");
@@ -240,7 +241,7 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
240241

241242
// Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
242243
Partition partition = replicaMgr.getPartitionOrException(topicPartition);
243-
partition.truncateFullyAndStartAt(nextOffset, useFutureLog, Option.apply(leaderLogStartOffset));
244+
partition.truncateFullyAndStartAt(nextOffset, useFutureLog, Optional.of(leaderLogStartOffset));
244245
// Increment start offsets
245246
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
246247
unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented);

Diff for: core/src/main/scala/kafka/cluster/Partition.scala

+10-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package kafka.cluster
1818

19+
import java.lang.{Long => JLong}
1920
import java.util.concurrent.locks.ReentrantReadWriteLock
2021
import java.util.Optional
2122
import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
@@ -39,7 +40,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
3940
import org.apache.kafka.common.utils.Time
4041
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
4142
import org.apache.kafka.server.common.RequestLocal
42-
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard}
43+
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
4344
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4445
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
4546
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
@@ -49,7 +50,7 @@ import org.slf4j.event.Level
4950

5051
import scala.collection.Seq
5152
import scala.jdk.CollectionConverters._
52-
import scala.jdk.OptionConverters.RichOption
53+
import scala.jdk.OptionConverters.{RichOption, RichOptional}
5354
import scala.jdk.javaapi.OptionConverters
5455

5556
/**
@@ -494,7 +495,7 @@ class Partition(val topicPartition: TopicPartition,
494495
logManager.initializingLog(topicPartition)
495496
var maybeLog: Option[UnifiedLog] = None
496497
try {
497-
val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId, targetLogDirectoryId)
498+
val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId.toJava, targetLogDirectoryId)
498499
if (!isFutureReplica) log.setLogOffsetsListener(logOffsetsListener)
499500
maybeLog = Some(log)
500501
updateHighWatermark(log)
@@ -593,7 +594,7 @@ class Partition(val topicPartition: TopicPartition,
593594
*/
594595
def topicId: Option[Uuid] = {
595596
if (_topicId.isEmpty || _topicId.contains(Uuid.ZERO_UUID)) {
596-
_topicId = this.log.orElse(logManager.getLog(topicPartition)).flatMap(_.topicId)
597+
_topicId = this.log.orElse(logManager.getLog(topicPartition)).flatMap(_.topicId.toScala)
597598
}
598599
_topicId
599600
}
@@ -1170,7 +1171,7 @@ class Partition(val topicPartition: TopicPartition,
11701171
}
11711172
}
11721173

1173-
leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
1174+
leaderLog.maybeIncrementHighWatermark(newHighWatermark).toScala match {
11741175
case Some(oldHighWatermark) =>
11751176
debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
11761177
true
@@ -1369,8 +1370,7 @@ class Partition(val topicPartition: TopicPartition,
13691370
s"live replica(s) broker.id are : $inSyncReplicaIds")
13701371
}
13711372

1372-
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
1373-
requestLocal, verificationGuard)
1373+
val info = leaderLog.appendAsLeader(records, this.leaderEpoch, origin, requestLocal, verificationGuard)
13741374

13751375
// we may need to increment high watermark since ISR could be down to 1
13761376
(info, maybeIncrementLeaderHW(leaderLog))
@@ -1622,7 +1622,7 @@ class Partition(val topicPartition: TopicPartition,
16221622
case Some(producers) =>
16231623
producerState
16241624
.setErrorCode(Errors.NONE.code)
1625-
.setActiveProducers(producers.asJava)
1625+
.setActiveProducers(producers)
16261626
case None =>
16271627
producerState
16281628
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)
@@ -1696,7 +1696,7 @@ class Partition(val topicPartition: TopicPartition,
16961696
*/
16971697
def truncateFullyAndStartAt(newOffset: Long,
16981698
isFuture: Boolean,
1699-
logStartOffsetOpt: Option[Long] = None): Unit = {
1699+
logStartOffsetOpt: Optional[JLong] = Optional.empty): Unit = {
17001700
// The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
17011701
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
17021702
inReadLock(leaderIsrUpdateLock) {
@@ -1724,7 +1724,7 @@ class Partition(val topicPartition: TopicPartition,
17241724
val localLogOrError = getLocalLog(currentLeaderEpoch, fetchOnlyFromLeader)
17251725
localLogOrError match {
17261726
case Left(localLog) =>
1727-
localLog.endOffsetForEpoch(leaderEpoch) match {
1727+
localLog.endOffsetForEpoch(leaderEpoch).toScala match {
17281728
case Some(epochAndOffset) => new EpochEndOffset()
17291729
.setPartition(partitionId)
17301730
.setErrorCode(Errors.NONE.code)

Diff for: core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,7 @@ class CoordinatorLoaderImpl[T](
103103
var numRecords = 0L
104104
var numBytes = 0L
105105
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
106-
val fetchDataInfo = log.read(
107-
startOffset = currentOffset,
108-
maxLength = loadBufferSize,
109-
isolation = FetchIsolation.LOG_END,
110-
minOneMessage = true
111-
)
106+
val fetchDataInfo = log.read(currentOffset, loadBufferSize, FetchIsolation.LOG_END, true)
112107

113108
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
114109

Diff for: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -601,10 +601,7 @@ class GroupMetadataManager(brokerId: Int,
601601
var readAtLeastOneRecord = true
602602

603603
while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) {
604-
val fetchDataInfo = log.read(currOffset,
605-
maxLength = config.loadBufferSize,
606-
isolation = FetchIsolation.LOG_END,
607-
minOneMessage = true)
604+
val fetchDataInfo = log.read(currOffset, config.loadBufferSize, FetchIsolation.LOG_END, true)
608605

609606
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
610607

Diff for: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -438,10 +438,7 @@ class TransactionStateManager(brokerId: Int,
438438
while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get() && inReadLock(stateLock) {
439439
loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
440440
idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
441-
val fetchDataInfo = log.read(currOffset,
442-
maxLength = config.transactionLogLoadBufferSize,
443-
isolation = FetchIsolation.LOG_END,
444-
minOneMessage = true)
441+
val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, FetchIsolation.LOG_END, true)
445442

446443
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
447444

0 commit comments

Comments
 (0)