Skip to content

KAFKA-20036 Handle LogCleaner segment overflow caused by compression level changes#21379

Open
m1a2st wants to merge 32 commits intoapache:trunkfrom
m1a2st:KAFKA-20036
Open

KAFKA-20036 Handle LogCleaner segment overflow caused by compression level changes#21379
m1a2st wants to merge 32 commits intoapache:trunkfrom
m1a2st:KAFKA-20036

Conversation

@m1a2st
Copy link
Collaborator

@m1a2st m1a2st commented Jan 31, 2026

We add a new map to record which topic partitions have experienced
overflow. When an overflow occurs, the next time the group is
processed, we reduce the segment size by a factor of 0.9 to prevent the
overflow from happening again. If the partition still overflows, we
continue to multiply the ratio by 0.9 on subsequent attempts until the
partition is successfully cleaned.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker storage Pull requests that target the storage module labels Jan 31, 2026
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for this fix

@chia7712 chia7712 requested review from jolshan and junrao January 31, 2026 19:38
@github-actions github-actions bot removed the triage PRs from the community label Feb 1, 2026
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the PR. Left a comment.

List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
log.logSegments(0, endOffset),
log.config().segmentSize(),
effectiveMaxSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, does this approach work in general? When grouping segments, we need to include at least one segment in the group. It's possible that the cleaning of a single segment can cause it to exceed 2GB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. We could follow the approach used for handling offset overflow: split the segment and then restart the cleanup. The trade-off is that the first hale of the segment will be cleaned in isolation, so there might be little to nothing to clean up :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That approach could work, but one has to guess the size to split the segments into. Have you considered the alternative of creating multiple cleaned segments? log.replaceSegments() already supports replacing multiple segments. If cleanInto() hits a file overflow exception, we could close the current cleaned segment, create a new one and continue the cleaning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can roll the new segment when either the "size check" or "overflow check" is triggered.

                // 1. Size Check: current size + retained size > config limit
                // 2. Overflow Check: max offset - base offset > Integer.MAX_VALUE
                boolean willExceedSize = (long) dest.size() + retained.sizeInBytes() > log.config().segmentSize();
                boolean willOverflow = result.maxOffset() - dest.baseOffset() > Integer.MAX_VALUE;

                if (willExceedSize || willOverflow) {
                    logger.info("Rolling new segment. Condition met: size_exceeded={}, overflow={}. (Segment size: {}, Batch size: {}, BaseOffset: {}, MaxOffset: {})",
                            willExceedSize, willOverflow, dest.size(), retained.sizeInBytes(), dest.baseOffset(), result.maxOffset());

                    dest = rollNewSegment(log, dest, cleanedSegments, transactionMetadata, retained);
                }

However, I have another concern regrading "temporary disk usage". If we remove the initial segment grouping entirely, it might requires a significant amount of disk space to hold all the cleaned segments simultaneously before the replacement happens.

I believe the grouping logic should be retained, but simplified to serve as a batch size threshold. This way, we can control the cleaning scope to avoid occupying too much disk space, while still allowing the inner logic to split segment dynamically if needed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should retain groupSegmentsBySize() to control temporary disk usage, and handle overflow dynamically within cleanInto() by creating multiple cleaned segments as needed.

This approach allows us to avoid disk space issues while still handling segment overflow gracefully. The peak disk usage remains bounded by the group size rather than the total log size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will still want to group the segments.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. A couple of more comments.

* @param currentTime The time at which the clean was initiated
* @param log The log instance for creating new segments if overflow occurs
*
* @return The current active destination segment (maybe different from input dest if overflow occurred)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This api seems awkward. An alternative is to instead passing in a starting position in sourceRecords to cleanInto(). Initially, we can pass in 0 as the position, if cleanInto() hits a size limit, it throws an exception with the current position in sourceRecords. The caller catches this exception, creates a new destination segment and call cleanInto() again with position in the exception and the new destination segment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I agree this is a cleaner design. I've updated cleanInto() to accept a starting position and throw an exception containing the current position on overflow. The caller then creates a new destination segment and resumes cleaning from where it left off.

MemoryRecords retained = MemoryRecords.readableRecords(outputBuffer);

// Check for TWO types of overflow BEFORE appending:
// 1. Offset overflow: offset range exceeds Integer.MAX_VALUE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grouping of the segments takes offset overflow into consideration. So, it seems that we can't hit this?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. A few more comments.


// Complete current cleaned segment
currentCleaned.onBecomeInactiveSegment();
currentCleaned.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set setLastModified for currentCleaned?

@m1a2st
Copy link
Collaborator Author

m1a2st commented Mar 12, 2026

Thanks for @junrao review, I have addressed all the comments

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. A few more comments.

// If cleanInto completes without exception, we're done with this segment
cleaningComplete = true;

} catch (SegmentSizeOverflowException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we catch LogSegmentOffsetOverflowException now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an exception for control flow feels a bit unnatural here. Could we consider returning a result object instead to indicate where the cleaning stopped?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's me attach a example to strengthen my comment.

private record Overflow(int position) {}
                if (sizeOverflow || offsetOverflow) {
                    // log.xxx
                    return Optional.of(new Overflow(position - result.bytesRead()));
                }

   ...

   return Optional.empty();
while (!cleaningComplete) {
                    Optional<Overflow> overflowOpt = cleanInto(
                            log.topicPartition(),
                            currentSegment.log(),
                            currentCleaned,
                            position,
                    );

                    if (overflowOpt.isPresent()) {
                        Overflow overflow = overflowOpt.get();
                        logger.info("Completing cleaned segment {} due to overflow, creating new segment", currentCleaned.baseOffset());

                        currentCleaned.onBecomeInactiveSegment();
                        currentCleaned.flush();
                        currentCleaned.setLastModified(currentSegment.lastModified());
                        cleanedSegments.add(currentCleaned);

                        Iterator<FileChannelRecordBatch> nextBatches = currentSegment.log().batchesFrom(overflow.position()).iterator();
                        long nextBaseOffset = nextBatches.hasNext() ? nextBatches.next().baseOffset() : currentCleaned.readNextOffset();
                        currentCleaned = UnifiedLog.createNewCleanedSegment(log.dir(), log.config(), nextBaseOffset);
                        transactionMetadata.setCleanedIndex(Optional.of(currentCleaned.txnIndex()));

                        position = overflow.position(); 
                    } else {
                        cleaningComplete = true;
                    }
                }

cleanedSegments.add(currentCleaned);

// Create new cleaned segment with base offset = next offset of completed segment
long nextBaseOffset = currentCleaned.readNextOffset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the logic here to handle LogSegmentOffsetOverflowException, it's better to set nextBaseOffset as the base offset of the next batch to be cleaned. Since compaction can leave a hole, currentCleaned.readNextOffset() may not always be the base offset of the next batch to be cleaned.

// remove the index entry
if (segment.baseOffset() != sortedNewSegments.get(0).baseOffset()) {
// remove the index entry; skip removal for base offsets that a new segment is replacing in-place
if (!newSegmentBaseOffsets.contains(segment.baseOffset())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge the code in line 1051 here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to merge them

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st I noticed that the integration test we has earlier was removed. Was this intentional?

// recompression during cleaning can cause the cleaned segment to exceed that size.
// Similarly, combining multiple source segments into one cleaned segment can cause
// the offset range to exceed Integer.MAX_VALUE.
boolean sizeOverflow = retained.sizeInBytes() > maxCleanedSegmentSize - dest.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the write for the first batch of an empty segment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should allow this.

@m1a2st
Copy link
Collaborator Author

m1a2st commented Mar 22, 2026

I noticed that the integration test we has earlier was removed. Was this intentional?

Since the integration test is expensive and generating 2GB of data is not ideal, I removed it.

@chia7712
Copy link
Member

-rw-r--r--. 1 astraea astraea        378 Mar 22 18:06 00000000000002538495.snapshot
-rw-r--r--. 1 astraea astraea      51360 Mar 22 18:06 00000000000002538495.timeindex
-rw-r--r--. 1 astraea astraea      51372 Mar 22 18:06 00000000000002538495.timeindex.deleted
-rw-r--r--. 1 astraea astraea        448 Mar 22 18:06 00000000000002769043.index
-rw-r--r--. 1 astraea astraea     589184 Mar 22 18:06 00000000000002769043.log
-rw-r--r--. 1 astraea astraea         48 Mar 22 18:06 00000000000002769043.timeindex
-rw-r--r--. 1 astraea astraea    1661440 Mar 22 18:06 00000000000002769100.index
-rw-r--r--. 1 astraea astraea 2147474716 Mar 22 18:06 00000000000002769100.log
-rw-r--r--. 1 astraea astraea        378 Mar 22 18:06 00000000000002769100.snapshot
-rw-r--r--. 1 astraea astraea      49248 Mar 22 18:06 00000000000002769100.timeindex
-rw-r--r--. 1 astraea astraea    1661456 Mar 22 18:06 00000000000002999960.index
-rw-r--r--. 1 astraea astraea 2147473581 Mar 22 18:06 00000000000002999960.log

Exercised this patch via a producer flow with positive results. Instead of throwing a exception, the cleaner now yield undersized log files. Typically, these files will be naturally merged in future compaction passes as the dataset accrues duplicate keys

* Clean a group of segments into a single replacement segment.
* Clean a group of segments into one or more replacement segments.
*
* <p>If cleaning would cause the destination segment's size or offset range to exceed the configured limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

     * <p>If cleaning causes the destination segment's size or offset range to exceed the configured limit
     * (e.g., due to recompression or combining multiple source segments), the current cleaned segment is
     * finalized and a new one is started.

m1a2st added 3 commits March 23, 2026 20:08
# Conflicts:
#	storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker storage Pull requests that target the storage module

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants