Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ public List<CompactionJob> createCompactionJobs(
return Collections.emptyList();
}

for (IntervalPartitioningInfo intervalInfo : searchIntervals) {
for (int i = 0; i < searchIntervals.size(); i++) {
IntervalPartitioningInfo intervalInfo = searchIntervals.get(i);
Interval reindexingInterval = intervalInfo.getInterval();

if (!reindexingInterval.overlaps(adjustedTimelineInterval)) {
Expand All @@ -363,14 +364,24 @@ public List<CompactionJob> createCompactionJobs(
continue;
}

// Skip intervals that extend past the skip offset boundary (not just data boundary)
// This preserves granularity alignment and ensures intervals exist in synthetic timeline
// Only apply this when a skip offset is actually configured
// Skip offsets, if configured, can result in needing to truncate a search interval. If the truncation makes the interval invalid, skip it.
if ((skipOffsetFromNow != null || skipOffsetFromLatest != null) &&
intervalEndsAfter(reindexingInterval, adjustedTimelineInterval.getEnd())) {
LOG.debug("Search interval[%s] extends past skip offset boundary[%s], skipping to preserve alignment",
reindexingInterval, adjustedTimelineInterval.getEnd());
continue;

DateTime alignedEnd = intervalInfo.getGranularity().bucketStart(adjustedTimelineInterval.getEnd());
if (!alignedEnd.isAfter(reindexingInterval.getStart())) {
LOG.debug("Search interval[%s] is entirely within skip offset, skipping", reindexingInterval);
continue;
}
reindexingInterval = new Interval(reindexingInterval.getStart(), alignedEnd);
// Replace the entry in searchIntervals so the downstream synthetic-timeline lookup
// in ReindexingConfigBuilder matches the truncated interval.
intervalInfo = new IntervalPartitioningInfo(
reindexingInterval,
intervalInfo.getSourceRule(),
intervalInfo.isRuleSynthetic()
);
searchIntervals.set(i, intervalInfo);
}

InlineSchemaDataSourceCompactionConfig.Builder builder = createBaseBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void test_createCompactionJobs_withSkipOffsetFromLatest_skipAllOfTime()
}

@Test
public void test_createCompactionJobs_withSkipOffsetFromLatest_skipsIntervalsExtendingPastOffset()
public void test_createCompactionJobs_withSkipOffsetFromLatest_truncatesIntervalsExtendingPastSkipOffset()
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90), referenceTime.minusDays(10));
Expand All @@ -399,9 +399,11 @@ public void test_createCompactionJobs_withSkipOffsetFromLatest_skipsIntervalsExt
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();

Assertions.assertEquals(1, processedIntervals.size());
Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN, processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(0).getEnd());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(1).getStart());
Assertions.assertEquals(referenceTime.minusDays(15), processedIntervals.get(1).getEnd());

EasyMock.verify(mockProvider, mockParams, mockSource);
}
Expand All @@ -411,7 +413,7 @@ public void test_createCompactionJobs_withSkipOffsetFromLatest_eliminatesInterva
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90), referenceTime.minusDays(10));
ReindexingRuleProvider mockProvider = createMockProvider(List.of(Period.days(7), Period.days(30)));
ReindexingRuleProvider mockProvider = createMockProvider(List.of(Period.days(3), Period.days(7), Period.days(30)));
CompactionJobParams mockParams = createMockParams(referenceTime, timeline);
DruidInputSource mockSource = createMockSource();

Expand All @@ -422,9 +424,11 @@ public void test_createCompactionJobs_withSkipOffsetFromLatest_eliminatesInterva
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();

Assertions.assertEquals(1, processedIntervals.size());
Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN, processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(0).getEnd());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(1).getStart());
Assertions.assertEquals(referenceTime.minusDays(25), processedIntervals.get(1).getEnd());

EasyMock.verify(mockProvider, mockParams, mockSource);
}
Expand All @@ -451,7 +455,7 @@ public void test_createCompactionJobs_withSkipOffsetFromNow_skipAllOfTime()
}

@Test
public void test_createCompactionJobs_withSkipOffsetFromNow_skipsIntervalsExtendingPastOffset()
public void test_createCompactionJobs_withSkipOffsetFromNow_truncatesIntervalThatExtendsPastSkipOffset()
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90), referenceTime.minusDays(10));
Expand All @@ -466,9 +470,11 @@ public void test_createCompactionJobs_withSkipOffsetFromNow_skipsIntervalsExtend
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();

Assertions.assertEquals(1, processedIntervals.size());
Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN, processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(0).getEnd());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(1).getStart());
Assertions.assertEquals(referenceTime.minusDays(20), processedIntervals.get(1).getEnd());

EasyMock.verify(mockProvider, mockParams, mockSource);
}
Expand All @@ -478,7 +484,7 @@ public void test_createCompactionJobs_withSkipOffsetFromNow_eliminatesInterval()
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90), referenceTime.minusDays(10));
ReindexingRuleProvider mockProvider = createMockProvider(List.of(Period.days(7), Period.days(30)));
ReindexingRuleProvider mockProvider = createMockProvider(List.of(Period.days(3), Period.days(7), Period.days(30)));
CompactionJobParams mockParams = createMockParams(referenceTime, timeline);
DruidInputSource mockSource = createMockSource();

Expand All @@ -489,9 +495,11 @@ public void test_createCompactionJobs_withSkipOffsetFromNow_eliminatesInterval()
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();

Assertions.assertEquals(1, processedIntervals.size());
Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN, processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(0).getEnd());
Assertions.assertEquals(referenceTime.minusDays(30), processedIntervals.get(1).getStart());
Assertions.assertEquals(referenceTime.minusDays(20), processedIntervals.get(1).getEnd());

EasyMock.verify(mockProvider, mockParams, mockSource);
}
Expand Down
Loading