Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.OffsetPosition;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
Expand Down Expand Up @@ -907,28 +908,90 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti
}
}

/**
* Check if segment has already expired based on remote storage's retention time.
*/
private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment segment, long retentionMs) throws IOException {
if (retentionMs <= 0) {
return false;
}
return time.milliseconds() - segment.largestTimestamp() > retentionMs;
}

/**
* Check if segment has already expired based on remote storage‘s retention size.
*/
private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment segment, long retentionBytes, long logSize, long accumulatedSkippedSize) {
if (retentionBytes <= 0) {
return false;
}
return (logSize - retentionBytes - accumulatedSkippedSize) > segment.size();
}

/**
* Segments which match the following criteria are eligible for copying to remote storage:
* 1) Segment is not the active segment and
* 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only
* committed/acked messages
*
* Additionally, if a segment has already expired based on remote storage's retention configuration,
* it will be skipped from upload and logStartOffset will be updated to allow local deletion.
* However, we can only skip and advance logStartOffset for a CONTIGUOUS sequence of expired segments
* from the beginning. Once we encounter a non-expired segment, we must stop skipping to ensure
* data consistency (similar to local segment deletion logic).
*
* @param log The log from which the segments are to be copied
* @param fromOffset The offset from which the segments are to be copied
* @param lastStableOffset The last stable offset of the log
* @return candidate log segments to be copied to remote storage
*/
List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) {
List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) throws IOException {
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
List<LogSegment> segments = log.logSegments(fromOffset, Long.MAX_VALUE);
if (!segments.isEmpty()) {
for (int idx = 1; idx < segments.size(); idx++) {
LogSegment previousSeg = segments.get(idx - 1);
LogSegment currentSeg = segments.get(idx);
if (currentSeg.baseOffset() <= lastStableOffset) {
if (segments.isEmpty()) {
return candidateLogSegments;
}
long retentionMs = log.config() != null ? log.config().retentionMs : -1;
long retentionSize = log.config() != null ? log.config().retentionSize : -1;
// Compute log.size() once when retention is size-based; skip when not needed to avoid wasted work.
long logSize = retentionSize > 0 ? log.size() : -1;
long accumulatedSkippedSize = 0;
// Flag to track if we can still skip expired segments.
// We can only skip contiguous expired segments from the beginning.
// Once we encounter a non-expired segment, we must stop skipping.
boolean canSkipExpiredSegments = true;

for (int idx = 1; idx < segments.size(); idx++) {
LogSegment previousSeg = segments.get(idx - 1);
LogSegment currentSeg = segments.get(idx);
if (currentSeg.baseOffset() > lastStableOffset) {
continue;
}

boolean isExpired = isSegmentExpiredByTimeForRemoteStorage(previousSeg, retentionMs) ||
isSegmentExpiredBySizeForRemoteStorage(previousSeg, retentionSize, logSize, accumulatedSkippedSize);

if (isExpired) {
if (canSkipExpiredSegments) {
// Can skip and advance logStartOffset for contiguous expired segments at the beginning
long newLogStartOffset = currentSeg.baseOffset();
log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentExpiredByRemoteRetention);
logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.",
previousSeg, newLogStartOffset);
accumulatedSkippedSize += previousSeg.size();
} else {
// Expired segment after a non-expired segment - cannot skip.
// Add to candidates to maintain order. Will be handled after earlier segments are uploaded.
logger.debug("Segment {} is expired but must be processed because earlier non-expired segments are pending upload.",
previousSeg);
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
}
continue;
}
// Discard the last active segment

// Found a non-expired segment - stop allowing skip-and-advance for subsequent expired segments
canSkipExpiredSegments = false;
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
}
return candidateLogSegments;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public enum LogStartOffsetIncrementReason {
LeaderOffsetIncremented("leader offset increment"),
SegmentDeletion("segment deletion"),
SegmentExpiredByRemoteRetention("segment has already expired based on remote storage retention policy"),
ClientRecordDeletion("client delete records request"),
SnapshotGenerated("snapshot generated");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2113,7 +2113,7 @@ public void testRemoteSegmentWithinLeaderEpochsForOverlappingSegments() {
}

@Test
public void testCandidateLogSegmentsSkipsActiveSegment() {
public void testCandidateLogSegmentsSkipsActiveSegment() throws IOException {
UnifiedLog log = mock(UnifiedLog.class);
LogSegment segment1 = mock(LogSegment.class);
LogSegment segment2 = mock(LogSegment.class);
Expand All @@ -2136,7 +2136,7 @@ public void testCandidateLogSegmentsSkipsActiveSegment() {
}

@Test
public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() throws IOException {
UnifiedLog log = mock(UnifiedLog.class);
LogSegment segment1 = mock(LogSegment.class);
LogSegment segment2 = mock(LogSegment.class);
Expand Down