Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/data-management/automatic-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ The MSQ task engine is available as a compaction engine if you configure auto-co

You can use [MSQ task engine context parameters](../multi-stage-query/reference.md#context-parameters) in `spec.taskContext` when configuring your datasource for automatic compaction, such as setting the maximum number of tasks using the `spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context parameters overlap with automatic compaction parameters. When these settings overlap, set one or the other.

#### Scaling task count for minor compactions

[Minor compactions](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst) typically rewrite a small subset of segments and do less work than full compactions. To avoid spawning a full-sized MSQ task topology for that lighter workload, opt in by setting `spec.taskContext.minorCompactionTaskPercent` to a value between 1 and 100. The percent is applied to `maxNumTasks` to derive the task count for minor compaction tasks; the result is floored at 2 (the MSQ minimum of one controller and one worker).

| Parameter | Description | Default value |
|---|---|---|
| `minorCompactionTaskPercent` | Percent (1-100) used to scale `maxNumTasks` for MSQ minor compactions. Has no effect on full compactions or on the native compaction engine. By default, minor compactions use the same task count as full compactions; set this parameter to opt into scaling. A starting value in the range of 40 to 50 is reasonable for most workloads. | 100 (no scaling) |

For example, with `maxNumTasks` set to 5 and `minorCompactionTaskPercent` set to 40, a minor compaction task launches with `maxNumTasks` of 2 (40% of 5, rounded, then floored at 2), while a full compaction task on the same datasource still launches with `maxNumTasks` of 5.

#### MSQ task engine limitations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ public class ClientMSQContext
* Limit to ensure that an MSQ compaction task doesn't take up all task slots in a cluster.
*/
public static final int MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK = 5;

/**
* Compaction-only context key (1-100) used to scale {@link #CTX_MAX_NUM_TASKS} for MSQ minor compactions.
* The scaled value is floored at {@link #DEFAULT_MAX_NUM_TASKS}. Has no effect on full compactions or on the native
* compaction engine.
*/
public static final String CTX_MINOR_COMPACTION_TASK_PERCENT = "minorCompactionTaskPercent";
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.client.indexing.ClientMinorCompactionInputSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
Expand Down Expand Up @@ -294,6 +297,17 @@ private int submitCompactionTasks(
return numSubmittedTasks;
}

/**
* Default percent applied to {@code maxNumTasks} for MSQ minor compactions
* when the supervisor's {@code taskContext} does not specify
* {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT}. The default of
* 100 disables scaling, matching the opt-in shape of the minor compaction
* feature itself (which is gated by non-zero
* {@code minUncompactedBytesPercentForFullCompaction} or
* {@code minUncompactedRowsPercentForFullCompaction} on the policy).
*/
public static final int DEFAULT_MINOR_COMPACTION_TASK_PERCENT = 100;

/**
* Creates a {@link ClientCompactionTaskQuery} which can be submitted to an
* {@link OverlordClient} to start a compaction task.
Expand Down Expand Up @@ -448,6 +462,52 @@ public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
return autoCompactionSnapshotPerDataSource.get();
}

/**
* Reduces {@code maxNumTasks} on the task context for MSQ minor compactions
* by the percent specified under
* {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT} in the same
* context, defaulting to {@link #DEFAULT_MINOR_COMPACTION_TASK_PERCENT} when
* absent. The scaled value is floored at {@link ClientMSQContext#DEFAULT_MAX_NUM_TASKS}
* (the MSQ minimum of 1 controller + 1 worker). No-op when the engine is
* native, the mode is full, or the percent is 100.
*/
private static void maybeScaleMaxNumTasksForMinorCompaction(
Map<String, Object> context,
CompactionMode compactionMode,
ClientCompactionRunnerInfo compactionRunner
)
{
if (compactionMode != CompactionMode.UNCOMPACTED_SEGMENTS_ONLY
|| !CompactionEngine.MSQ.equals(compactionRunner.getType())) {
return;
}

final int percent = QueryContext.of(context).getInt(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Validate minorCompactionTaskPercent when accepting the compaction config

The new context key is only parsed and range-checked while creating a minor MSQ compaction task. Supervisor config validation still accepts values like 0, 200, or a non-numeric string, so the API can persist an invalid supervisor and it will later fail job creation repeatedly once a minor compaction candidate appears. Please add validation alongside the existing MSQ maxNumTasks validation paths, including CascadingReindexingTemplate if applicable, so bad configs are rejected before scheduling.

ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
DEFAULT_MINOR_COMPACTION_TASK_PERCENT
);
if (percent < 1 || percent > 100) {
throw InvalidInput.exception(
"'%s'[%d] must be between 1 and 100",
ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
percent
);
}
if (percent == 100) {
return;
}

final int originalMaxNumTasks = QueryContext.of(context).getInt(
ClientMSQContext.CTX_MAX_NUM_TASKS,
ClientMSQContext.DEFAULT_MAX_NUM_TASKS
);
final int scaledMaxNumTasks = Math.max(
ClientMSQContext.DEFAULT_MAX_NUM_TASKS,
(int) Math.round(originalMaxNumTasks * percent / 100.0)
);
context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, scaledMaxNumTasks);
}

private static ClientCompactionTaskQuery compactSegments(
CompactionCandidate entry,
Eligibility eligibility,
Expand Down Expand Up @@ -480,6 +540,8 @@ private static ClientCompactionTaskQuery compactSegments(
context.put(COMPACTION_POLICY_RESULT, eligibility.getReason());
}

maybeScaleMaxNumTasksForMinorCompaction(context, compactionMode, compactionRunner);

String taskIdPrefix = compactionMode == CompactionMode.UNCOMPACTED_SEGMENTS_ONLY
? TASK_ID_PREFIX + "-minor"
: TASK_ID_PREFIX;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.duty;

import com.google.common.collect.ImmutableList;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionStatistics;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.compaction.Eligibility;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Tests for the {@code minorCompactionTaskPercent} task-context key applied by
* {@link CompactSegments#createCompactionTask}.
*/
public class CompactSegmentsMinorTaskScalingTest
{
private static final String DATA_SOURCE = "wiki";

@Test
public void testMinor_msq_defaultDoesNotScale()
{
final ClientCompactionTaskQuery task = buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.MSQ,
contextWithMaxNumTasks(10)
);
Assert.assertEquals(10, getMaxNumTasks(task));
}

@Test
public void testMinor_msq_explicitPercentInTaskContext()
{
final Map<String, Object> ctx = contextWithMaxNumTasks(10);
ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 25);
final ClientCompactionTaskQuery task = buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.MSQ,
ctx
);
Assert.assertEquals(3, getMaxNumTasks(task));
}

@Test
public void testMinor_msq_floorsAtTwo()
{
final Map<String, Object> ctx = contextWithMaxNumTasks(3);
ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 40);
final ClientCompactionTaskQuery task = buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.MSQ,
ctx
);
Assert.assertEquals(2, getMaxNumTasks(task));
}

@Test
public void testMinor_msq_percentOneHundredIsNoOp()
{
final Map<String, Object> ctx = contextWithMaxNumTasks(5);
ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 100);
final ClientCompactionTaskQuery task = buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.MSQ,
ctx
);
Assert.assertEquals(5, getMaxNumTasks(task));
}

@Test
public void testMinor_msq_outOfRangePercentBelowOneThrows()
{
final Map<String, Object> ctx = contextWithMaxNumTasks(10);
ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 0);
final DruidException thrown = Assert.assertThrows(
DruidException.class,
() -> buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.MSQ,
ctx
)
);
Assert.assertEquals(DruidException.Category.INVALID_INPUT, thrown.getCategory());
}

@Test
public void testFull_msq_doesNotScale()
{
final ClientCompactionTaskQuery task = buildTask(
Eligibility.FULL,
CompactionEngine.MSQ,
contextWithMaxNumTasks(5)
);
Assert.assertEquals(5, getMaxNumTasks(task));
}

@Test
public void testMinor_native_doesNotScale()
{
final ClientCompactionTaskQuery task = buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.NATIVE,
contextWithMaxNumTasks(5)
);
Assert.assertEquals(5, getMaxNumTasks(task));
}

@Test
public void testMinor_msq_outOfRangePercentThrows()
{
final Map<String, Object> ctx = contextWithMaxNumTasks(10);
ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 200);
final DruidException thrown = Assert.assertThrows(
DruidException.class,
() -> buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.MSQ,
ctx
)
);
Assert.assertEquals(DruidException.Category.INVALID_INPUT, thrown.getCategory());
}

@Test
public void testMinor_msq_stringPercentIsParsed()
{
final Map<String, Object> ctx = contextWithMaxNumTasks(10);
ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, "50");
final ClientCompactionTaskQuery task = buildTask(
Eligibility.minor("uncompacted ratio below threshold"),
CompactionEngine.MSQ,
ctx
);
Assert.assertEquals(5, getMaxNumTasks(task));
}

private static ClientCompactionTaskQuery buildTask(
Eligibility eligibility,
CompactionEngine engine,
Map<String, Object> taskContext
)
{
final DataSegment segment = new DataSegment(
DATA_SOURCE,
Intervals.of("2024-01-01/2024-01-02"),
"v1",
null,
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 1),
0,
100L
);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
DataSegment.DataSegment
should be avoided because it has been deprecated.
Comment on lines +149 to +159
final List<DataSegment> segments = ImmutableList.of(segment);
final CompactionStatus status = CompactionStatus.pending(
CompactionStatistics.create(0L, 0L, 0L, 0L),
CompactionStatistics.create(100L, 0L, 1L, 1L),
segments,
"for test"
);
final CompactionCandidate candidate = CompactionCandidate.from(segments, null, status);
final DataSourceCompactionConfig config = InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(DATA_SOURCE)
.withTaskContext(taskContext)
.withEngine(engine)
.build();
return CompactSegments.createCompactionTask(
candidate,
eligibility,
config,
engine,
null,
true
);
}

private static Map<String, Object> contextWithMaxNumTasks(int maxNumTasks)
{
final Map<String, Object> context = new HashMap<>();
context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks);
return context;
}

private static int getMaxNumTasks(ClientCompactionTaskQuery task)
{
return (int) task.getContext().get(ClientMSQContext.CTX_MAX_NUM_TASKS);
}
}
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ WorkerRpcFailed
TIMED_OUT
# MSQ context parameters
maxNumTasks
minorCompactionTaskPercent
taskAssignment
finalizeAggregations
indexSpec
Expand Down
Loading