Skip to content

Adjust merge and indexing throttling settings #125594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;

import static org.elasticsearch.common.util.concurrent.EsExecutors.allocatedProcessors;
import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessorsMaxFive;
import static org.elasticsearch.threadpool.ThreadPool.twiceAllocatedProcessors;

/**
* The merge scheduler (<code>ConcurrentMergeScheduler</code>) controls the execution of
* merge operations once they are needed (according to the merge policy). Merges
Expand All @@ -27,7 +31,7 @@
* <li> <code>index.merge.scheduler.max_thread_count</code>:
*
* The maximum number of threads that may be merging at once. Defaults to
* <code>Math.max(1, Math.min(4, {@link EsExecutors#allocatedProcessors(Settings)} / 2))</code>
* <code>Math.max(1, Math.min(5, {@link EsExecutors#allocatedProcessors(Settings)} / 2))</code>
* which works well for a good solid-state-disk (SSD). If your index is on
* spinning platter drives instead, decrease this to 1.
*
Expand All @@ -45,14 +49,14 @@ public final class MergeSchedulerConfig {

public static final Setting<Integer> MAX_THREAD_COUNT_SETTING = new Setting<>(
"index.merge.scheduler.max_thread_count",
(s) -> Integer.toString(Math.max(1, Math.min(4, EsExecutors.allocatedProcessors(s) / 2))),
(s) -> Integer.toString(halfAllocatedProcessorsMaxFive(allocatedProcessors(s))),
(s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"),
Property.Dynamic,
Property.IndexScope
);
public static final Setting<Integer> MAX_MERGE_COUNT_SETTING = new Setting<>(
"index.merge.scheduler.max_merge_count",
(s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s) + 5),
(s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s) + twiceAllocatedProcessors(allocatedProcessors(s))),
(s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_merge_count"),
Property.Dynamic,
Property.IndexScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ public class ThreadPoolMergeExecutorService {
/**
* Floor for IO write rate limit of individual merge tasks (we will never go any lower than this)
*/
static final ByteSizeValue MIN_IO_RATE = ByteSizeValue.ofMb(5L);
static final ByteSizeValue MIN_IO_RATE = ByteSizeValue.ofMb(10L);
/**
* Ceiling for IO write rate limit of individual merge tasks (we will never go any higher than this)
*/
static final ByteSizeValue MAX_IO_RATE = ByteSizeValue.ofMb(10240L);
/**
* Initial value for IO write rate limit of individual merge tasks when doAutoIOThrottle is true
*/
static final ByteSizeValue START_IO_RATE = ByteSizeValue.ofMb(20L);
static final ByteSizeValue START_IO_RATE = ByteSizeValue.ofMb(40L);
/**
* Total number of submitted merge tasks that support IO auto throttling and that have not yet been run (or aborted).
* This includes merge tasks that are currently running and that are backlogged (by their respective merge schedulers).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
);
private final ShardId shardId;
private final MergeSchedulerConfig config;
private final Logger logger;
protected final Logger logger;
private final MergeTracking mergeTracking;
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessors;
import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA;

public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders {
Expand Down Expand Up @@ -145,7 +146,13 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) {
result.put(
ThreadPool.Names.MERGE,
new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, allocatedProcessors, TimeValue.timeValueMinutes(5), true)
new ScalingExecutorBuilder(
ThreadPool.Names.MERGE,
1,
halfAllocatedProcessors(allocatedProcessors),
TimeValue.timeValueMinutes(5),
true
)
);
}
result.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,15 +655,15 @@ static int halfAllocatedProcessors(final int allocatedProcessors) {
return (allocatedProcessors + 1) / 2;
}

static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
public static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy(halfAllocatedProcessors(allocatedProcessors), 1, 5);
}

static int halfAllocatedProcessorsMaxTen(final int allocatedProcessors) {
return boundedBy(halfAllocatedProcessors(allocatedProcessors), 1, 10);
}

static int twiceAllocatedProcessors(final int allocatedProcessors) {
public static int twiceAllocatedProcessors(final int allocatedProcessors) {
return boundedBy(2 * allocatedProcessors, 2, Integer.MAX_VALUE);
}

Expand Down