Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/data-management/automatic-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ The MSQ task engine is available as a compaction engine if you configure auto-co

You can use [MSQ task engine context parameters](../multi-stage-query/reference.md#context-parameters) in `spec.taskContext` when configuring your datasource for automatic compaction, such as setting the maximum number of tasks using the `spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context parameters overlap with automatic compaction parameters. When these settings overlap, set one or the other.

#### Scaling task count for minor compactions

[Minor compactions](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst) typically rewrite a small subset of segments and do less work than full compactions. To avoid spawning a full-sized MSQ task topology for that lighter workload, opt in by setting `spec.taskContext.minorCompactionTaskPercent` to a value between 1 and 100. The percent is applied to `maxNumTasks` to derive the task count for minor compaction tasks; the result is floored at 2 (the MSQ minimum of one controller and one worker).

| Parameter | Description | Default value |
|---|---|---|
| `minorCompactionTaskPercent` | Percent (1-100) used to scale `maxNumTasks` for MSQ minor compactions. Has no effect on full compactions or on the native compaction engine. By default, minor compactions use the same task count as full compactions; set this parameter to opt into scaling. A starting value in the range of 40 to 50 is reasonable for most workloads. | 100 (no scaling) |

For example, with `maxNumTasks` set to 5 and `minorCompactionTaskPercent` set to 40, a minor compaction task launches with `maxNumTasks` of 2 (40% of 5, rounded, then floored at 2), while a full compaction task on the same datasource still launches with `maxNumTasks` of 5.

#### MSQ task engine limitations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.apache.druid.testing.embedded.compact;

import com.google.common.collect.ImmutableList;
import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand All @@ -29,6 +33,7 @@
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
Expand All @@ -50,6 +55,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
Expand Down Expand Up @@ -113,8 +119,10 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -296,6 +304,86 @@ public void test_minorCompactionWithMSQ(MostFragmentedIntervalFirstPolicy policy
Assertions.assertEquals(4000, getTotalRowCount());
}

@Test
public void test_minorCompactionWithMSQ_minorCompactionTaskPercentScalesWorkers()
{
final MostFragmentedIntervalFirstPolicy policy =
new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null, null);
configureCompaction(CompactionEngine.MSQ, policy);

ingest1kRecords();
ingest1kRecords();

overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

// maxNumTasks=3 (controller + 2 workers) for full; minorCompactionTaskPercent=50
// scales minor compactions to maxNumTasks=2 (controller + 1 worker).
final InlineSchemaDataSourceCompactionConfig dayConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
.withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
WikipediaStreamEventStreamGenerator.dimensions()
.stream()
.map(StringDimensionSchema::new)
.collect(Collectors.toUnmodifiableList())))
.withTaskContext(buildScalingTaskContext(3, 50))
.withIoConfig(new UserCompactionTaskIOConfig(true))
.withTuningConfig(
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false))
.build()
)
.build();

// First run: full compaction (no compacted segments yet).
runCompactionWithSpec(dayConfig);
waitForAllCompactionTasksToFinish();
pauseCompaction(dayConfig);

overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));

// Capture the full task before the minor task arrives (its ID does not contain "-minor").
final TaskStatusPlus fullTask = findMostRecentCompactionTask(taskId -> !taskId.contains("-minor"));
Assertions.assertNotNull(fullTask, "Expected a full compaction task for datasource[" + dataSource + "]");
assertCompactionTaskMaxNumTasks(fullTask, 3);

// Ingest more so the next run sees a mix of compacted + uncompacted segments.
ingest1kRecords();
ingest1kRecords();

overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));

final long totalUsed = overlord.latchableEmitter().getMetricValues(
"segment/metadataCache/used/count",
Map.of(DruidMetrics.DATASOURCE, dataSource)
).stream().reduce((first, second) -> second).orElse(0).longValue();

// Second run: minor compaction (uncompacted bytes ratio is below the 80% threshold).
runCompactionWithSpec(dayConfig);
waitForAllCompactionTasksToFinish();

overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("segment/metadataCache/used/count")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueMatching(Matchers.greaterThan(totalUsed)));

Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

final TaskStatusPlus minorTask = findMostRecentCompactionTask(taskId -> taskId.contains("-minor"));
Assertions.assertNotNull(minorTask, "Expected a minor compaction task for datasource[" + dataSource + "]");
assertCompactionTaskMaxNumTasks(minorTask, 2);
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine)
Expand Down Expand Up @@ -1208,6 +1296,50 @@ private int getNumSegmentsWith(Granularity granularity)
.count();
}

private static Map<String, Object> buildScalingTaskContext(int maxNumTasks, int minorCompactionTaskPercent)
{
final Map<String, Object> context = new HashMap<>();
context.put("useConcurrentLocks", true);
context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks);
context.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, minorCompactionTaskPercent);
return context;
}

/**
* Returns the most recent completed compaction task whose id matches the given filter,
* or null if none is found.
*/
@Nullable
private TaskStatusPlus findMostRecentCompactionTask(Predicate<String> taskIdFilter)
{
final List<TaskStatusPlus> tasks = ImmutableList.copyOf(
(CloseableIterator<TaskStatusPlus>) cluster.callApi().onLeaderOverlord(
o -> o.taskStatuses("complete", dataSource, 100)
)
);
TaskStatusPlus match = null;
for (TaskStatusPlus task : tasks) {
if ("compact".equals(task.getType()) && taskIdFilter.test(task.getId())) {
match = task;
}
}
return match;
}

/**
* Asserts that the submitted compaction task carries the expected {@code maxNumTasks} in its context.
*/
private void assertCompactionTaskMaxNumTasks(TaskStatusPlus task, int expectedMaxNumTasks)
{
final TaskPayloadResponse payload = cluster.callApi().onLeaderOverlord(o -> o.taskPayload(task.getId()));
final ClientCompactionTaskQuery query = (ClientCompactionTaskQuery) payload.getPayload();
Assertions.assertEquals(
expectedMaxNumTasks,
((Number) query.getContext().get(ClientMSQContext.CTX_MAX_NUM_TASKS)).intValue(),
"maxNumTasks in compaction task[" + task.getId() + "] context"
);
}

private void runIngestionAtGranularity(
String granularity,
String inlineDataCsv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ public class ClientMSQContext
* Limit to ensure that an MSQ compaction task doesn't take up all task slots in a cluster.
*/
public static final int MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK = 5;

/**
* Compaction-only context key (1-100) used to scale {@link #CTX_MAX_NUM_TASKS} for MSQ minor compactions.
* The scaled value is floored at {@link #DEFAULT_MAX_NUM_TASKS}. Has no effect on full compactions or on the native
* compaction engine.
*/
public static final String CTX_MINOR_COMPACTION_TASK_PERCENT = "minorCompactionTaskPercent";
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.client.indexing.ClientMinorCompactionInputSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
Expand Down Expand Up @@ -294,6 +297,17 @@ private int submitCompactionTasks(
return numSubmittedTasks;
}

/**
* Default percent applied to {@code maxNumTasks} for MSQ minor compactions
* when the supervisor's {@code taskContext} does not specify
* {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT}. The default of
* 100 disables scaling, matching the opt-in shape of the minor compaction
* feature itself (which is gated by non-zero
* {@code minUncompactedBytesPercentForFullCompaction} or
* {@code minUncompactedRowsPercentForFullCompaction} on the policy).
*/
public static final int DEFAULT_MINOR_COMPACTION_TASK_PERCENT = 100;

/**
* Creates a {@link ClientCompactionTaskQuery} which can be submitted to an
* {@link OverlordClient} to start a compaction task.
Expand Down Expand Up @@ -448,6 +462,52 @@ public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
return autoCompactionSnapshotPerDataSource.get();
}

/**
* Reduces {@code maxNumTasks} on the task context for MSQ minor compactions
* by the percent specified under
* {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT} in the same
* context, defaulting to {@link #DEFAULT_MINOR_COMPACTION_TASK_PERCENT} when
* absent. The scaled value is floored at {@link ClientMSQContext#DEFAULT_MAX_NUM_TASKS}
* (the MSQ minimum of 1 controller + 1 worker). No-op when the engine is
* native, the mode is full, or the percent is 100.
*/
private static void maybeScaleMaxNumTasksForMinorCompaction(
Map<String, Object> context,
CompactionMode compactionMode,
ClientCompactionRunnerInfo compactionRunner
)
{
if (compactionMode != CompactionMode.UNCOMPACTED_SEGMENTS_ONLY
|| !CompactionEngine.MSQ.equals(compactionRunner.getType())) {
return;
}

final int percent = QueryContext.of(context).getInt(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Validate minorCompactionTaskPercent when accepting the compaction config

The new context key is only parsed and range-checked while creating a minor MSQ compaction task. Supervisor config validation still accepts values like 0, 200, or a non-numeric string, so the API can persist an invalid supervisor and it will later fail job creation repeatedly once a minor compaction candidate appears. Please add validation alongside the existing MSQ maxNumTasks validation paths, including CascadingReindexingTemplate if applicable, so bad configs are rejected before scheduling.

ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
DEFAULT_MINOR_COMPACTION_TASK_PERCENT
);
if (percent < 1 || percent > 100) {
throw InvalidInput.exception(
"'%s'[%d] must be between 1 and 100",
ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
percent
);
}
if (percent == 100) {
return;
}

final int originalMaxNumTasks = QueryContext.of(context).getInt(
ClientMSQContext.CTX_MAX_NUM_TASKS,
ClientMSQContext.DEFAULT_MAX_NUM_TASKS
);
final int scaledMaxNumTasks = Math.max(
ClientMSQContext.DEFAULT_MAX_NUM_TASKS,
(int) Math.round(originalMaxNumTasks * percent / 100.0)
);
context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, scaledMaxNumTasks);
}

private static ClientCompactionTaskQuery compactSegments(
CompactionCandidate entry,
Eligibility eligibility,
Expand Down Expand Up @@ -480,6 +540,8 @@ private static ClientCompactionTaskQuery compactSegments(
context.put(COMPACTION_POLICY_RESULT, eligibility.getReason());
}

maybeScaleMaxNumTasksForMinorCompaction(context, compactionMode, compactionRunner);

String taskIdPrefix = compactionMode == CompactionMode.UNCOMPACTED_SEGMENTS_ONLY
? TASK_ID_PREFIX + "-minor"
: TASK_ID_PREFIX;
Expand Down
Loading
Loading