Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@
<suppress checks="MethodLength" files="(RemoteLogManager|RemoteLogManagerConfig).java"/>
<suppress checks="JavaNCSS" files="RemoteLogManagerTest.java"/>

<!-- storage test -->
<suppress checks="ImportControl" files="CleanerIntegrationTest.java"/>
<!-- benchmarks -->
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
files="(ReplicaFetcherThreadBenchmark).java"/>
Expand Down
78 changes: 76 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, SegmentOverflowException, TransactionIndex, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.{ArgumentMatchers, MockedConstruction, Mockito}
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}
import org.mockito.invocation.InvocationOnMock

import java.io.{File, RandomAccessFile}
import java.nio._
Expand Down Expand Up @@ -2218,6 +2219,79 @@ class LogCleanerTest extends Logging {
}
}

@Test
def testSegmentWithCompactionDataOverflow(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)

val logProps = new Properties()
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val log = makeLog(config = config)
val dir = log.parentDir

log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, key = "test".getBytes), 0)

val segments = log.logSegments.asScala.toList
val segmentToClean = segments.head
val topicPartition = log.topicPartition()

val mockTxnIndex = Mockito.mock(classOf[TransactionIndex])
Mockito.when(mockTxnIndex.file()).thenReturn(new File(dir, "mock.txnindex"))

val appendCallCount = new java.util.concurrent.atomic.AtomicInteger(0)

val mockLogSegmentCtor = Mockito.mockConstruction(
classOf[LogSegment],
new MockedConstruction.MockInitializer[LogSegment] {
override def prepare(mock: LogSegment, context: MockedConstruction.Context): Unit = {
Mockito.when(mock.txnIndex()).thenReturn(mockTxnIndex)
Mockito.when(mock.baseOffset()).thenReturn(segmentToClean.baseOffset())

Mockito.when(mock.readNextOffset()).thenReturn(segmentToClean.baseOffset() + 1)
Mockito.doNothing().when(mock).onBecomeInactiveSegment()
Mockito.doNothing().when(mock).flush()
Mockito.doNothing().when(mock).setLastModified(ArgumentMatchers.anyLong())

Mockito.doAnswer((invocation: InvocationOnMock) => {
if (appendCallCount.incrementAndGet() == 1) {
// first time, it should throw SegmentOverflowException
throw new SegmentOverflowException(mock)
} else {
// second time, it should work fine
null
}
}).when(mock).append(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(classOf[MemoryRecords])
)
}
}
)

try {
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, util.List.of(segmentToClean),
cleaner.offsetMap, 0L,
new CleanerStats(Time.SYSTEM),
new CleanedTransactionMetadata(),
-1, log.logEndOffset)
)

assertTrue(cleaner.segmentOverflowPartitions().containsKey(topicPartition))
val segmentRatio = cleaner.segmentOverflowPartitions().get(topicPartition)
assertEquals(0.9, segmentRatio)

val cleanable = new LogToClean(log, 0L, log.activeSegment.baseOffset, true)
cleaner.doClean(cleanable, time.milliseconds())

assertFalse(cleaner.segmentOverflowPartitions().containsKey(topicPartition))

} finally {
mockLogSegmentCtor.close()
log.close()
}
}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for (((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.appendAsFollower(messageWithOffset(key, value, offset), Int.MaxValue).lastOffset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -60,6 +61,11 @@ public class Cleaner {
private final Time time;
private final Consumer<TopicPartition> checkDone;

/**
* The topic partitions that have segment overflow history mapped to their segment size ratio
*/
private final Map<TopicPartition, Double> segmentOverflowPartitions = new HashMap<>();

/**
* Buffer used for read i/o
*/
Expand Down Expand Up @@ -169,9 +175,17 @@ public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long currentT
log.name(), new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs));
CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata();

double sizeRatio = segmentOverflowPartitions.getOrDefault(log.topicPartition(), 1.0);
if (sizeRatio != 1.0) {
logger.info("Partition {} has overflow history. " + "Reducing effective segment size to {}% for this round.",
log.topicPartition(), sizeRatio * 100);
}

int effectiveMaxSize = (int) (log.config().segmentSize() * sizeRatio);

List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
log.logSegments(0, endOffset),
log.config().segmentSize(),
effectiveMaxSize,
log.config().maxIndexSize,
cleanable.firstUncleanableOffset()
);
Expand All @@ -180,6 +194,12 @@ public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long currentT
cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset);
}

if (segmentOverflowPartitions.remove(log.topicPartition()) != null) {
logger.info("Successfully cleaned log {} with degraded size (ratio: {}%). " +
"Cleared overflow marker. Next cleaning will use normal size.",
log.name(), sizeRatio * 100);
}

// record buffer utilization
stats.bufferUtilization = offsetMap.utilization();

Expand Down Expand Up @@ -254,6 +274,19 @@ public void cleanSegments(UnifiedLog log,
stats,
currentTime
);
} catch (SegmentOverflowException e) {
var previousRatio = segmentOverflowPartitions.put(log.topicPartition(),
segmentOverflowPartitions.getOrDefault(log.topicPartition(), 1.0) * 0.9);
if (previousRatio == null) {
logger.warn("Segment overflow detected for partition {}: {}. " +
"Marked for degradation to 90% size in next cleaning round.",
log.topicPartition(), e.getMessage());
} else {
logger.warn("Repeated segment overflow for partition {}: {}. " +
"Further degrading to {}% size in next cleaning round.",
log.topicPartition(), e.getMessage(), previousRatio * 0.9 * 100);
}
throw new LogCleaningAbortedException();
} catch (LogSegmentOffsetOverflowException e) {
// Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
// scratch once the split is complete.
Expand Down Expand Up @@ -400,11 +433,16 @@ public boolean shouldRetainRecord(RecordBatch batch, Record record) {
if (outputBuffer.position() > 0) {
outputBuffer.flip();
MemoryRecords retained = MemoryRecords.readableRecords(outputBuffer);
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset(), retained);
throttler.maybeThrottle(outputBuffer.limit());
try {
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset(), retained);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you wrap only dest.append in the try-catch block to avoid catching unrelated error?

} catch (IllegalArgumentException e) {
// this indicates that we have an offset overflow in the destination segment
throw new SegmentOverflowException(dest);
}
}
throttler.maybeThrottle(outputBuffer.limit());

// if we read bytes but didn't get even one complete batch, our I/O buffer is too small, grow it and try again
// `result.bytesRead` contains bytes from `messagesRead` and any discarded batches.
Expand Down Expand Up @@ -763,4 +801,9 @@ private boolean buildOffsetMapForSegment(TopicPartition topicPartition,

return false;
}

// only for testing
public Map<TopicPartition, Double> segmentOverflowPartitions() {
return Map.copyOf(segmentOverflowPartitions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.common.KafkaException;

/**
* Exception thrown when segment size would overflow during compaction
*/
public class SegmentOverflowException extends KafkaException {

public SegmentOverflowException(LogSegment segment) {
super("Segment size would overflow during compaction for segment " + segment);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CleanerIntegrationTest {

@SuppressWarnings("unchecked")
@ClusterTest(types = Type.CO_KRAFT)
public void testCleanerSegmentCompactionOverflow(ClusterInstance cluster) throws Exception {
String topic = "compaction-overflow-test";
try (var admin = cluster.admin()) {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
newTopic.configs(Map.of(
"cleanup.policy", "compact",
"compression.type", "lz4",
"segment.bytes", String.valueOf(Integer.MAX_VALUE - 1),
"min.cleanable.dirty.ratio", "0.01"
));
admin.createTopics(List.of(newTopic)).all().get();
cluster.waitTopicCreation(topic, 1);

var data = new byte[10240];
var random = new Random();
random.nextBytes(data);
var producers = IntStream.range(0, 5).mapToObj(__ -> CompletableFuture.runAsync(() -> {
try (var producer = cluster.producer(Map.of(
ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, "17",
ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"))) {
for (int i = 0; i < 60_000; i++) {
byte[] key = Uuid.randomUuid().toString().getBytes();
if (Math.random() < 0.1)
producer.send(new ProducerRecord<>(topic, key, null));
else
producer.send(new ProducerRecord<>(topic, key, data));
}
}
})).toList();
producers.forEach(CompletableFuture::join);

var ids = admin.describeCluster().nodes().get().stream().map(Node::id).toList();
var size = admin.describeLogDirs(ids).allDescriptions().get().entrySet()
.stream()
.flatMap(e -> e.getValue().values()
.stream()
.flatMap(v -> v.replicaInfos().entrySet().stream()))
.filter(v -> v.getKey().topic().equals(topic))
.mapToLong(v -> v.getValue().size()).sum();
assertTrue(Integer.MAX_VALUE < size, "log size should exceed Integer.MAX_VALUE to trigger overflow");
}
var metrics = KafkaYammerMetrics.defaultRegistry().allMetrics();
metrics.forEach((name, metric) -> {
if (name.getName().contains("uncleanable-partitions-count")) {
Gauge<Integer> value = (Gauge<Integer>) metric;
assertEquals(0, value.value(), "there should be no uncleanable partitions due to segment overflow");
}
});
}
}