Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.metadata.HeapMemoryCompactionStateManager;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSegmentIterator;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
Expand Down Expand Up @@ -135,7 +136,8 @@ public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
policy,
compactionConfigs,
dataSources,
Collections.emptyMap()
Collections.emptyMap(),
new HeapMemoryCompactionStateManager()
);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
Expand Down
1 change: 1 addition & 0 deletions docs/api-reference/automatic-compaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ This includes the following fields:
|`compactionPolicy`|Policy to choose intervals for compaction. Currently, the only supported policy is [Newest segment first](#compaction-policy-newestsegmentfirst).|Newest segment first|
|`useSupervisors`|Whether compaction should be run on Overlord using supervisors instead of Coordinator duties.|false|
|`engine`|Engine used for running compaction tasks, unless overridden in the datasource-level compaction config. Possible values are `native` and `msq`. `msq` engine can be used for compaction only if `useSupervisors` is `true`.|`native`|
|`legacyPersistLastCompactionStateInSegments`|Whether to persist the full compaction state in segment metadata. When `true` (default), compaction state is stored in both the segment metadata and the compaction states table. This is historically how Druid has worked. When `false`, only a fingerprint reference is stored in the segment metadata, reducing storage overhead in the segments table. The actual compaction state is stored in the compaction states table and can be referenced with the aforementioned fingerprint. Eventually this configuration will be removed and all compaction will use the fingerprint method only. This configuration exists for operators to opt into this future pattern early. **WARNING: if you set this to false and then compact data, rolling back to a Druid version that pre-dates compaction state fingerprinting (< Druid 36) will result in missing compaction states and trigger compaction on segments that may already be compacted.**|`true`|

#### Compaction policy `newestSegmentFirst`

Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ These properties specify the JDBC connection and other configuration around the
|`druid.metadata.storage.tables.segments`|The table to use to look for segments.|`druid_segments`|
|`druid.metadata.storage.tables.rules`|The table to use to look for segment load/drop rules.|`druid_rules`|
|`druid.metadata.storage.tables.config`|The table to use to look for configs.|`druid_config`|
|`druid.metadata.storage.tables.compactionStates`|The table to use to store compaction state fingerprints.|`druid_compactionStates`|
|`druid.metadata.storage.tables.tasks`|Used by the indexing service to store tasks.|`druid_tasks`|
|`druid.metadata.storage.tables.taskLog`|Used by the indexing service to store task logs.|`druid_tasklogs`|
|`druid.metadata.storage.tables.taskLock`|Used by the indexing service to store task locks.|`druid_tasklocks`|
Expand Down Expand Up @@ -810,6 +811,8 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.manager.rules.pollDuration`|The duration between polls the Coordinator does for updates to the set of active rules. Generally defines the amount of lag time it can take for the Coordinator to notice rules.|`PT1M`|
|`druid.manager.rules.defaultRule`|The default rule for the cluster|`_default`|
|`druid.manager.rules.alertThreshold`|The duration after a failed poll upon which an alert should be emitted.|`PT10M`|
|`druid.manager.compactionState.cacheSize`|The maximum number of compaction state fingerprints to cache in memory on the coordinator and overlord. Compaction state fingerprints are used to track the compaction configuration applied to segments. Consider increasing this value if you have a large number of datasources with compaction configurations.|`100`|
|`druid.manager.compactionState.prewarmSize`|The number of most recently used compaction state fingerprints to load into cache on Coordinator startup. This pre-warms the cache to improve performance immediately after startup.|`100`|
Comment on lines +814 to +815
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both Coordinator and Overlord (with segment metadata caching enabled) already keep all used segments in memory, including the respective (interned) CompactionState objects as well.
I don't think the number of distinct CompactState objects that we keep in memory will increase after this patch.

Do we still need to worry about the cache size of these objects?
Does a cache miss trigger a fetch from metadata store?


#### Dynamic configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) th
@ParameterizedTest(name = "useSupervisors={0}")
public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exception
{
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));

loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Expand Down Expand Up @@ -1552,7 +1552,7 @@ public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exce
@ParameterizedTest(name = "useSupervisors={0}")
public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws Exception
{
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));

loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Expand Down Expand Up @@ -1854,7 +1854,7 @@ private void forceTriggerAutoCompaction(
).collect(Collectors.toList())
);
updateClusterConfig(
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null)
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null, true)
);

// Wait for scheduler to pick up the compaction job
Expand All @@ -1864,7 +1864,7 @@ private void forceTriggerAutoCompaction(

// Disable all compaction
updateClusterConfig(
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null)
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null, true)
);
} else {
forceTriggerAutoCompaction(numExpectedSegmentsAfterCompaction);
Expand Down Expand Up @@ -1956,7 +1956,8 @@ private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCom
maxCompactionTaskSlots,
oldConfig.getCompactionPolicy(),
oldConfig.isUseSupervisors(),
oldConfig.getEngine()
oldConfig.getEngine(),
oldConfig.isLegacyPersistLastCompactionStateInSegments()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
Expand All @@ -50,6 +51,7 @@
import org.apache.druid.testing.embedded.EmbeddedRouter;
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.timeline.CompactionState;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Period;
Expand Down Expand Up @@ -111,14 +113,14 @@ public EmbeddedDruidCluster createCluster()
private void configureCompaction(CompactionEngine compactionEngine)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine, true))
);
Assertions.assertTrue(updateResponse.isSuccess());
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineConfig(CompactionEngine compactionEngine)
public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToYearGranularity_withInlineConfig(CompactionEngine compactionEngine)
{
configureCompaction(compactionEngine);

Expand All @@ -132,7 +134,7 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));

// Create a compaction config with MONTH granularity
InlineSchemaDataSourceCompactionConfig compactionConfig =
InlineSchemaDataSourceCompactionConfig monthGranConfig =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
InlineSchemaDataSourceCompactionConfig monthGranConfig =
InlineSchemaDataSourceCompactionConfig monthGranularityConfig =

InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
Expand Down Expand Up @@ -165,11 +167,170 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
)
.build();

runCompactionWithSpec(compactionConfig);
runCompactionWithSpec(monthGranConfig);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.MONTH));

verifyCompactedSegmentsHaveFingerprints(monthGranConfig);

InlineSchemaDataSourceCompactionConfig yearGranConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null)
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();

overlord.latchableEmitter().flush(); // flush events so wait for works correctly on the next round of compaction
runCompactionWithSpec(yearGranConfig);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(0, getNumSegmentsWith(Granularities.MONTH));
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.YEAR));

verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine)
throws InterruptedException
{
// Configure cluster with persistLastCompactionState=false
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(
new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine, false)
)
);
Assertions.assertTrue(updateResponse.isSuccess());

// Ingest data at DAY granularity
runIngestionAtGranularity(
"DAY",
"2025-06-01T00:00:00.000Z,shirt,105\n"
+ "2025-06-02T00:00:00.000Z,trousers,210"
);
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

// Create compaction config to compact to MONTH granularity
InlineSchemaDataSourceCompactionConfig monthConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(1000, null, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();

runCompactionWithSpec(monthConfig);
waitForAllCompactionTasksToFinish();

verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint();
}

private void verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint()
{
overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.forEach(segment -> {
Assertions.assertNull(
segment.getLastCompactionState(),
"Segment " + segment.getId() + " should have null lastCompactionState"
);
Assertions.assertNotNull(
segment.getCompactionStateFingerprint(),
"Segment " + segment.getId() + " should have non-null compactionStateFingerprint"
);
});
}

private void verifyCompactedSegmentsHaveFingerprints(DataSourceCompactionConfig compactionConfig)
{
String expectedFingerprint = CompactionState.generateCompactionStateFingerprint(
CompactSegments.createCompactionStateFromConfig(compactionConfig),
dataSource
);

overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.forEach(segment -> {
String fingerprint = segment.getCompactionStateFingerprint();
Assertions.assertNotNull(
fingerprint,
"Segment " + segment.getId() + " should have a compaction state fingerprint"
);
Assertions.assertFalse(
fingerprint.isEmpty(),
"Segment " + segment.getId() + " fingerprint should not be empty"
);
// SHA-256 fingerprints should be 64 hex characters
Assertions.assertEquals(
64,
fingerprint.length(),
"Segment " + segment.getId() + " fingerprint should be 64 characters (SHA-256)"
);
Comment on lines +314 to +327
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 assertions can be omitted, just the last one should suffice.

Assertions.assertEquals(
expectedFingerprint,
fingerprint,
"Segment " + segment.getId() + " fingerprint should match expected fingerprint"
);
});
}

private void runCompactionWithSpec(DataSourceCompactionConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
);

final ClusterCompactionConfig updatedCompactionConfig
= new ClusterCompactionConfig(1.0, 10, null, true, null);
= new ClusterCompactionConfig(1.0, 10, null, true, null, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
Expand Down Expand Up @@ -323,7 +323,7 @@ public void test_ingestClusterMetrics_compactionSkipsLockedIntervals()
);

final ClusterCompactionConfig updatedCompactionConfig
= new ClusterCompactionConfig(1.0, 10, null, true, null);
= new ClusterCompactionConfig(1.0, 10, null, true, null, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private IndexTask createIndexTaskForInlineData(String taskId)
private void enableCompactionSupervisor()
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 10, null, true, null))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 10, null, true, null, null))
);
Assertions.assertTrue(updateResponse.isSuccess());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
null,
null,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,21 @@ public static boolean isGuaranteedRollup(
return tuningConfig.isForceGuaranteedRollup();
}

public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateFingerprintToSegments(
String compactionStateFingerprint
)
{
if (compactionStateFingerprint != null) {
return segments -> segments.stream()
.map(
segment -> segment.withCompactionStateFingerprint(compactionStateFingerprint)
)
.collect(Collectors.toSet());
} else {
return Function.identity();
}
}

public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments(
boolean storeCompactionState,
TaskToolbox toolbox,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,11 +902,19 @@ private TaskStatus generateAndPublishSegments(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);

final String compactionStateFingerprint = getContextValue(
Tasks.COMPACTION_STATE_FINGERPRINT_KEY,
null
);

final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema
).andThen(
addCompactionStateFingerprintToSegments(compactionStateFingerprint)
);

Set<DataSegment> tombStones = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,13 @@ public class Tasks
static {
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
}

/**
* Context k:v pair that holds the fingerprint of the compaction state to be stored with the segment
*/
public static final String COMPACTION_STATE_FINGERPRINT_KEY = "compactionStateFingerprint";

static {
Verify.verify(COMPACTION_STATE_FINGERPRINT_KEY.equals(CompactSegments.COMPACTION_STATE_FINGERPRINT_KEY));
}
}
Loading
Loading