Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
21f121c
temp version
m1a2st Jan 31, 2026
23c7bf0
update the test
m1a2st Jan 31, 2026
9ad125d
spotless apply
m1a2st Jan 31, 2026
9b95afe
addressed by comments
m1a2st Feb 1, 2026
d92d2d6
Merge branch 'trunk' into KAFKA-20036
m1a2st Feb 10, 2026
ff86cd8
addressed by comments
m1a2st Feb 10, 2026
ce1433f
revert space
m1a2st Feb 11, 2026
f97d765
fix import
m1a2st Feb 11, 2026
cce93a8
Merge branch 'trunk' into KAFKA-20036
m1a2st Feb 11, 2026
a17562e
addressed by comment
m1a2st Feb 15, 2026
eab4c09
test new approach
m1a2st Feb 15, 2026
8608992
fix test
m1a2st Feb 15, 2026
9244e71
fix test
m1a2st Feb 16, 2026
e80eab0
revert test
m1a2st Feb 16, 2026
08d7f68
remove unused exception
m1a2st Feb 17, 2026
0106e70
Merge branch 'trunk' into KAFKA-20036
m1a2st Mar 3, 2026
b72cd43
refactor the code
m1a2st Mar 3, 2026
286c4d0
update the code
m1a2st Mar 3, 2026
1462eb4
temp fix
m1a2st Mar 11, 2026
845172f
add unit test
m1a2st Mar 11, 2026
242be0a
Merge branch 'trunk' into KAFKA-20036
m1a2st Mar 11, 2026
3505373
combined the logic
m1a2st Mar 11, 2026
1a9077a
cleanup the code
m1a2st Mar 12, 2026
4c83c89
cleanup the code
m1a2st Mar 12, 2026
739230b
fix other comments
m1a2st Mar 19, 2026
f60a153
Merge branch 'trunk' into KAFKA-20036
m1a2st Mar 19, 2026
ee138cf
addressed by comment
m1a2st Mar 20, 2026
3b5db07
addressed vy comment
m1a2st Mar 22, 2026
f2212c3
Merge branch 'trunk' into KAFKA-20036
m1a2st Mar 22, 2026
51097f2
addressed by comment
m1a2st Mar 23, 2026
1a6ce93
Merge branch 'trunk' into KAFKA-20036
m1a2st Mar 23, 2026
44b3598
Merge branch 'trunk' into KAFKA-20036
m1a2st Mar 24, 2026
2dd6025
Merge branch 'trunk' into KAFKA-20036
m1a2st Mar 28, 2026
3504536
addressed by comment
m1a2st Mar 28, 2026
e4dc30c
addressed by comment
m1a2st Mar 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@
<suppress checks="MethodLength" files="(RemoteLogManager|RemoteLogManagerConfig).java"/>
<suppress checks="JavaNCSS" files="RemoteLogManagerTest.java"/>

<!-- storage test -->
<suppress checks="ImportControl" files="CleanerIntegrationTest.java"/>
<!-- benchmarks -->
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
files="(ReplicaFetcherThreadBenchmark).java"/>
Expand Down
78 changes: 76 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, SegmentOverflowException, TransactionIndex, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.{ArgumentMatchers, MockedConstruction, Mockito}
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}
import org.mockito.invocation.InvocationOnMock

import java.io.{File, RandomAccessFile}
import java.nio._
Expand Down Expand Up @@ -2218,6 +2219,79 @@ class LogCleanerTest extends Logging {
}
}

@Test
def testSegmentWithCompactionDataOverflow(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)

val logProps = new Properties()
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val log = makeLog(config = config)
val dir = log.parentDir

log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, key = "test".getBytes), 0)

val segments = log.logSegments.asScala.toList
val segmentToClean = segments.head
val topicPartition = log.topicPartition()

val mockTxnIndex = Mockito.mock(classOf[TransactionIndex])
Mockito.when(mockTxnIndex.file()).thenReturn(new File(dir, "mock.txnindex"))

val appendCallCount = new java.util.concurrent.atomic.AtomicInteger(0)

val mockLogSegmentCtor = Mockito.mockConstruction(
classOf[LogSegment],
new MockedConstruction.MockInitializer[LogSegment] {
override def prepare(mock: LogSegment, context: MockedConstruction.Context): Unit = {
Mockito.when(mock.txnIndex()).thenReturn(mockTxnIndex)
Mockito.when(mock.baseOffset()).thenReturn(segmentToClean.baseOffset())

Mockito.when(mock.readNextOffset()).thenReturn(segmentToClean.baseOffset() + 1)
Mockito.doNothing().when(mock).onBecomeInactiveSegment()
Mockito.doNothing().when(mock).flush()
Mockito.doNothing().when(mock).setLastModified(ArgumentMatchers.anyLong())

Mockito.doAnswer((invocation: InvocationOnMock) => {
if (appendCallCount.incrementAndGet() == 1) {
// first time, it should throw SegmentOverflowException
throw new SegmentOverflowException(mock)
} else {
// second time, it should work fine
null
}
}).when(mock).append(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(classOf[MemoryRecords])
)
}
}
)

try {
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, util.List.of(segmentToClean),
cleaner.offsetMap, 0L,
new CleanerStats(Time.SYSTEM),
new CleanedTransactionMetadata(),
-1, log.logEndOffset)
)

assertTrue(cleaner.segmentOverflowPartitions().containsKey(topicPartition))
val segmentRatio = cleaner.segmentOverflowPartitions().get(topicPartition)
assertEquals(0.9, segmentRatio)

val cleanable = new LogToClean(log, 0L, log.activeSegment.baseOffset, true)
cleaner.doClean(cleanable, time.milliseconds())

assertFalse(cleaner.segmentOverflowPartitions().containsKey(topicPartition))

} finally {
mockLogSegmentCtor.close()
log.close()
}
}

private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for (((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.appendAsFollower(messageWithOffset(key, value, offset), Int.MaxValue).lastOffset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -60,6 +61,11 @@ public class Cleaner {
private final Time time;
private final Consumer<TopicPartition> checkDone;

/**
* The topic partitions that have segment overflow history mapped to their segment size ratio
*/
private final Map<TopicPartition, Double> segmentOverflowPartitions = new HashMap<>();

/**
* Buffer used for read i/o
*/
Expand Down Expand Up @@ -169,9 +175,18 @@ public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long currentT
log.name(), new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs));
CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata();

double sizeRatio = 1.0;
if (segmentOverflowPartitions.containsKey(log.topicPartition())) {
sizeRatio = segmentOverflowPartitions.get(log.topicPartition());
logger.info("Partition {} has overflow history. " + "Reducing effective segment size to {}% for this round.",
log.topicPartition(), sizeRatio * 100);
}

int effectiveMaxSize = (int) (log.config().segmentSize() * sizeRatio);

List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
log.logSegments(0, endOffset),
log.config().segmentSize(),
effectiveMaxSize,
log.config().maxIndexSize,
cleanable.firstUncleanableOffset()
);
Expand All @@ -180,6 +195,13 @@ public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long currentT
cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset);
}

if (segmentOverflowPartitions.containsKey(log.topicPartition())) {
segmentOverflowPartitions.remove(log.topicPartition());
logger.info("Successfully cleaned log {} with degraded size (ratio: {}%). " +
"Cleared overflow marker. Next cleaning will use normal size.",
log.name(), sizeRatio * 100);
}

// record buffer utilization
stats.bufferUtilization = offsetMap.utilization();

Expand Down Expand Up @@ -254,6 +276,20 @@ public void cleanSegments(UnifiedLog log,
stats,
currentTime
);
} catch (SegmentOverflowException e) {
if (segmentOverflowPartitions.containsKey(log.topicPartition())) {
Double segmentRatio = segmentOverflowPartitions.get(log.topicPartition());
segmentOverflowPartitions.put(log.topicPartition(), segmentRatio * 0.9);
logger.warn("Repeated segment overflow for partition {}: {}. " +
"Further degrading to {}% size in next cleaning round.",
log.topicPartition(), e.getMessage(), segmentRatio * 100);
} else {
segmentOverflowPartitions.put(log.topicPartition(), 0.9);
logger.warn("Segment overflow detected for partition {}: {}. " +
"Marked for degradation to 90% size in next cleaning round.",
log.topicPartition(), e.getMessage());
}
throw new LogCleaningAbortedException();
} catch (LogSegmentOffsetOverflowException e) {
// Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
// scratch once the split is complete.
Expand Down Expand Up @@ -400,10 +436,15 @@ public boolean shouldRetainRecord(RecordBatch batch, Record record) {
if (outputBuffer.position() > 0) {
outputBuffer.flip();
MemoryRecords retained = MemoryRecords.readableRecords(outputBuffer);
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset(), retained);
throttler.maybeThrottle(outputBuffer.limit());
try {
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset(), retained);
throttler.maybeThrottle(outputBuffer.limit());
} catch (IllegalArgumentException e) {
// this indicates that we have an offset overflow in the destination segment
throw new SegmentOverflowException(dest);
}
}

// if we read bytes but didn't get even one complete batch, our I/O buffer is too small, grow it and try again
Expand Down Expand Up @@ -763,4 +804,9 @@ private boolean buildOffsetMapForSegment(TopicPartition topicPartition,

return false;
}

// only for testing
public Map<TopicPartition, Double> segmentOverflowPartitions() {
return Map.copyOf(segmentOverflowPartitions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.common.KafkaException;

/**
* Exception thrown when segment size would overflow during compaction
*/
public class SegmentOverflowException extends KafkaException {
public final LogSegment segment;

public SegmentOverflowException(LogSegment segment) {
super("Segment size would overflow during compaction for segment " + segment);
this.segment = segment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CleanerIntegrationTest {

@SuppressWarnings("unchecked")
@ClusterTest(types = Type.CO_KRAFT)
public void testCleanerSegmentCompactionOverflow(ClusterInstance cluster) throws Exception {
String topic = "compaction-overflow-test";
try (var admin = cluster.admin()) {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
newTopic.configs(Map.of(
"cleanup.policy", "compact",
"compression.type", "lz4",
"segment.bytes", String.valueOf(Integer.MAX_VALUE - 1),
"min.cleanable.dirty.ratio", "0.01"
));
admin.createTopics(List.of(newTopic)).all().get();
cluster.waitTopicCreation(topic, 1);

var data = new byte[10240];
var random = new Random();
random.nextBytes(data);
var producers = IntStream.range(0, 5).mapToObj(__ -> CompletableFuture.runAsync(() -> {
try (var producer = cluster.producer(Map.of(
ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, "17",
ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"))) {
for (int i = 0; i < 60_000; i++) {
byte[] key = Uuid.randomUuid().toString().getBytes();
if (Math.random() < 0.1)
producer.send(new ProducerRecord<>(topic, key, null));
else
producer.send(new ProducerRecord<>(topic, key, data));
}
}
})).toList();
producers.forEach(CompletableFuture::join);

var ids = admin.describeCluster().nodes().get().stream().map(Node::id).toList();
var size = admin.describeLogDirs(ids).allDescriptions().get().entrySet()
.stream()
.flatMap(e -> e.getValue().values()
.stream()
.flatMap(v -> v.replicaInfos().entrySet().stream()))
.filter(v -> v.getKey().topic().equals(topic))
.mapToLong(v -> v.getValue().size()).sum();
assertTrue(Integer.MAX_VALUE < size, "log size should exceed Integer.MAX_VALUE to trigger overflow");
}
var metrics = KafkaYammerMetrics.defaultRegistry().allMetrics();
metrics.forEach((name, metric) -> {
if (name.getName().contains("uncleanable-partitions-count")) {
Gauge<Integer> value = (Gauge<Integer>) metric;
assertEquals(0, value.value(), "there should be no uncleanable partitions due to segment overflow");
}
});
}
}