Skip to content

Commit b763932

Browse files
authored
ESQL: Calculate concurrent node limit (#124901) (#127745)
Manual 8.19 backport of #124901
1 parent ad585d5 commit b763932

File tree

5 files changed

+374
-2
lines changed

5 files changed

+374
-2
lines changed

docs/changelog/124901.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124901
2+
summary: Calculate concurrent node limit
3+
area: ES|QL
4+
type: feature
5+
issues: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.planner;
9+
10+
import org.elasticsearch.core.Nullable;
11+
import org.elasticsearch.xpack.esql.core.expression.Literal;
12+
import org.elasticsearch.xpack.esql.core.util.Holder;
13+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
14+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
15+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
16+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
17+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
18+
import org.elasticsearch.xpack.esql.session.Configuration;
19+
20+
/**
21+
* Calculates the maximum number of nodes that should be queried concurrently for the given data node plan.
22+
* <p>
23+
* Used to avoid overloading the cluster with concurrent requests that may not be needed.
24+
* </p>
25+
*/
26+
public class PlanConcurrencyCalculator {
27+
public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator();
28+
29+
private PlanConcurrencyCalculator() {}
30+
31+
/**
32+
* @return {@code null} if there should be no limit, otherwise, the maximum number of nodes that should be queried concurrently.
33+
*/
34+
@Nullable
35+
public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) {
36+
// If available, pragma overrides any calculation
37+
if (configuration.pragmas().maxConcurrentNodesPerCluster() > 0) {
38+
return configuration.pragmas().maxConcurrentNodesPerCluster();
39+
}
40+
if (dataNodePlan == null) {
41+
return null;
42+
}
43+
44+
Integer dataNodeLimit = getDataNodeLimit(dataNodePlan);
45+
46+
if (dataNodeLimit != null) {
47+
return limitToConcurrency(dataNodeLimit);
48+
}
49+
50+
return null;
51+
}
52+
53+
private Integer limitToConcurrency(int limit) {
54+
// For high limits, don't limit the concurrency
55+
if (limit > 1000) {
56+
return null;
57+
}
58+
59+
// At least 2 nodes, otherwise log2(limit). E.g.
60+
// Limit | Concurrency
61+
// 1 | 2
62+
// 10 | 3
63+
// 1000 | 9
64+
return Math.max(2, (int) (Math.log(limit) / Math.log(2)));
65+
}
66+
67+
@Nullable
68+
private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) {
69+
LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan);
70+
71+
// State machine to find:
72+
// A relation
73+
Holder<Boolean> relationFound = new Holder<>(false);
74+
// ...followed by no other node that could break the calculation
75+
Holder<Boolean> forbiddenNodeFound = new Holder<>(false);
76+
// ...and finally, a limit
77+
Holder<Integer> limitValue = new Holder<>(null);
78+
79+
logicalPlan.forEachUp(node -> {
80+
// If a limit or a forbidden command was already found, ignore the rest
81+
if (limitValue.get() == null && forbiddenNodeFound.get() == false) {
82+
if (node instanceof EsRelation) {
83+
relationFound.set(true);
84+
} else if (relationFound.get()) {
85+
if (node instanceof Limit limit && limit.limit() instanceof Literal literalLimit) {
86+
limitValue.set((Integer) literalLimit.value());
87+
} else {
88+
forbiddenNodeFound.set(true);
89+
}
90+
}
91+
}
92+
});
93+
94+
return limitValue.get();
95+
}
96+
97+
private LogicalPlan getFragmentPlan(PhysicalPlan plan) {
98+
Holder<LogicalPlan> foundPlan = new Holder<>();
99+
plan.forEachDown(node -> {
100+
if (node instanceof FragmentExec fragment) {
101+
foundPlan.set(fragment.fragment());
102+
}
103+
});
104+
return foundPlan.get();
105+
}
106+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4343
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
4444
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
45+
import org.elasticsearch.xpack.esql.planner.PlanConcurrencyCalculator;
4546
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
4647
import org.elasticsearch.xpack.esql.session.Configuration;
4748

@@ -98,13 +99,15 @@ void startComputeOnDataNodes(
9899
Runnable runOnTaskFailure,
99100
ActionListener<ComputeResponse> outListener
100101
) {
102+
Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
103+
101104
new DataNodeRequestSender(
102105
transportService,
103106
esqlExecutor,
104107
clusterAlias,
105108
parentTask,
106109
configuration.allowPartialResults(),
107-
configuration.pragmas().maxConcurrentNodesPerCluster()
110+
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
108111
) {
109112
@Override
110113
protected void sendRequest(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public final class QueryPragmas implements Writeable {
5656
public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);
5757

5858
public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
59-
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
59+
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1);
6060
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
6161
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
6262

0 commit comments

Comments
 (0)