Skip to content

Commit e3b61e0

Browse files
authored
Add strict and pooled Broker tier selector strategies (#19094)
Adds two new Broker TierSelectorStrategy implementations to provide finer control over how Brokers select Historical and Realtime servers for query execution. - strict – Only selects servers whose priorities match the configured list. Unlike other existing strategies, there is no fallback to servers with other priorities if the configured priorities are unavailable. This also addresses a current limitation with watched tiers: when multiple tiers are configured, Brokers can still retain visibility into the state of the cluster, while enforcing query isolation at the time of server selection rather than filtering servers at the time of building the Broker's server view. - pooled – Pools servers across the configured priorities and selects among them, allowing queries to utilize multiple priority tiers for improved availability. This is particularly useful for querying realtime servers where the number of task replicas per tier may be limited for cost reasons. Both strategies require the configured set of priorities to be non-empty. Similar to queries routed to tiers that are not part of the watched tiers, these strategies may result in queries returning no data if the configured tiers are unavailable.
1 parent bfcc0e5 commit e3b61e0

10 files changed

Lines changed: 1057 additions & 1 deletion

File tree

server/src/main/java/org/apache/druid/client/selector/AbstractTierSelectorStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
*/
3535
public abstract class AbstractTierSelectorStrategy implements TierSelectorStrategy
3636
{
37-
private final ServerSelectorStrategy serverSelectorStrategy;
37+
protected final ServerSelectorStrategy serverSelectorStrategy;
3838

3939
public AbstractTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
4040
{
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.client.selector;
21+
22+
import com.fasterxml.jackson.annotation.JacksonInject;
23+
import com.fasterxml.jackson.annotation.JsonCreator;
24+
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
25+
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
26+
import org.apache.druid.client.QueryableDruidServer;
27+
import org.apache.druid.java.util.emitter.EmittingLogger;
28+
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
29+
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
30+
import org.apache.druid.query.Query;
31+
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
32+
import org.apache.druid.timeline.DataSegment;
33+
34+
import javax.annotation.Nullable;
35+
import java.util.Comparator;
36+
import java.util.LinkedHashSet;
37+
import java.util.List;
38+
import java.util.Set;
39+
40+
/**
41+
* A {@link TierSelectorStrategy} that pools servers with the configured set of priorities from {@link PooledTierSelectorStrategyConfig#getPriorities()}
42+
* and delegates server selection to the configured {@link ServerSelectorStrategy}.
43+
* <p>
44+
* Unlike other {@link TierSelectorStrategy} like {@link CustomTierSelectorStrategy}
45+
* which has a preference for priority order, this strategy treats all configured priorities equally
46+
* by combining their servers into a single selection pool and delegates to {@link ServerSelectorStrategy} to do
47+
* the server selection. If no servers match the configured priorities in the pool, an empty server list is returned,
48+
* which may cause queries to return partial or no data.
49+
* <p>
50+
* Example configuration:
51+
* <li> <code> druid.broker.select.tier=pooled </code> </li>
52+
* <li> <code> druid.broker.select.tier.pooled.priorities=[2,1] </code> </li>
53+
* <p>
54+
* With this configuration, servers with priority 2 and 1 are pooled together and
55+
* selection is delegated to the {@link ServerSelectorStrategy}. Servers with other
56+
* priorities are ignored.
57+
*/
58+
public class PooledTierSelectorStrategy extends AbstractTierSelectorStrategy
59+
{
60+
private static final EmittingLogger log = new EmittingLogger(PooledTierSelectorStrategy.class);
61+
public static final String TYPE = "pooled";
62+
63+
private final PooledTierSelectorStrategyConfig config;
64+
private final ServiceEmitter emitter;
65+
private final Set<Integer> configuredPriorities;
66+
67+
@JsonCreator
68+
public PooledTierSelectorStrategy(
69+
@JacksonInject final ServerSelectorStrategy serverSelectorStrategy,
70+
@JacksonInject final PooledTierSelectorStrategyConfig config,
71+
@JacksonInject final ServiceEmitter emitter
72+
)
73+
{
74+
super(serverSelectorStrategy);
75+
this.config = config;
76+
this.emitter = emitter;
77+
this.configuredPriorities = config.getPriorities();
78+
}
79+
80+
@Override
81+
public <T> List<QueryableDruidServer> pick(
82+
@Nullable final Query<T> query,
83+
final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
84+
final DataSegment segment,
85+
final int numServersToPick
86+
)
87+
{
88+
final Set<QueryableDruidServer> candidateServerPool = new LinkedHashSet<>();
89+
90+
for (Int2ObjectMap.Entry<Set<QueryableDruidServer>> entry : prioritizedServers.int2ObjectEntrySet()) {
91+
final int priority = entry.getIntKey();
92+
final Set<QueryableDruidServer> servers = entry.getValue();
93+
94+
if (configuredPriorities.contains(priority)) {
95+
candidateServerPool.addAll(servers);
96+
} else {
97+
log.debug(
98+
"Server priority[%d] not in the configured list of priorities[%s] so ignore servers[%s] for query[%s]",
99+
priority, config.getPriorities(), servers, query
100+
);
101+
}
102+
}
103+
104+
if (candidateServerPool.isEmpty()) {
105+
if (query == null || query instanceof SegmentMetadataQuery) {
106+
// Debug logging to reduce logging spam as these are typically system-generated segment metadata queries
107+
log.debug(
108+
"No server found for query[%s] from server priorities[%s]. Configured priorities[%s].",
109+
query, prioritizedServers.keySet(), config.getPriorities()
110+
);
111+
} else {
112+
log.warn(
113+
"No servers found for query[%s] matching configured priorities[%s]. Available priorities[%s].",
114+
query, config.getPriorities(), prioritizedServers.keySet()
115+
);
116+
emitter.emit(
117+
ServiceMetricEvent.builder()
118+
.setMetric("tierSelector/noServer", 1)
119+
.setDimension("dataSource", String.valueOf(query.getDataSource()))
120+
.setDimension("tierSelectorType", TYPE)
121+
.setDimension("queryType", query.getType())
122+
.setDimension("queryPriority", String.valueOf(query.context().getPriority()))
123+
.setDimensionIfNotNull("queryId", query.getId())
124+
);
125+
}
126+
return List.of();
127+
}
128+
129+
final List<QueryableDruidServer> selectedServers = serverSelectorStrategy.pick(query, candidateServerPool, segment, numServersToPick);
130+
log.debug("Selected servers[%s] for query[%s] from given servers[%s] and candidateServerPool[%s]", selectedServers, query, prioritizedServers, candidateServerPool);
131+
return selectedServers;
132+
}
133+
134+
/**
135+
* @return the natural order of priorities since priority order doesn't matter for this strategy as the configured set of
136+
* priorities in the pool are treated equally and delegated to {@link #serverSelectorStrategy}.
137+
*/
138+
@Override
139+
public Comparator<Integer> getComparator()
140+
{
141+
return Comparator.naturalOrder();
142+
}
143+
144+
public PooledTierSelectorStrategyConfig getConfig()
145+
{
146+
return config;
147+
}
148+
149+
@Override
150+
public String toString()
151+
{
152+
return "PooledTierSelectorStrategy{" +
153+
"config=" + config +
154+
'}';
155+
}
156+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.client.selector;
21+
22+
import com.fasterxml.jackson.annotation.JsonProperty;
23+
import org.apache.druid.error.DruidException;
24+
import org.apache.druid.utils.CollectionUtils;
25+
26+
import java.util.Set;
27+
28+
/**
29+
* Configuration for {@link PooledTierSelectorStrategy}.
30+
* <p>
31+
* Requires a non-empty set of {@code priorities}. The order of priorities don't matter.
32+
*/
33+
public class PooledTierSelectorStrategyConfig
34+
{
35+
@JsonProperty
36+
private final Set<Integer> priorities;
37+
38+
public Set<Integer> getPriorities()
39+
{
40+
return priorities;
41+
}
42+
43+
public PooledTierSelectorStrategyConfig(@JsonProperty("priorities") final Set<Integer> priorities)
44+
{
45+
if (CollectionUtils.isNullOrEmpty(priorities)) {
46+
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
47+
.ofCategory(DruidException.Category.INVALID_INPUT)
48+
.build(
49+
"priorities must be non-empty when using pooled tier selector on the Broker. Found priorities[%s].",
50+
priorities
51+
);
52+
}
53+
this.priorities = priorities;
54+
}
55+
56+
@Override
57+
public String toString()
58+
{
59+
return "PooledTierSelectorStrategyConfig{" +
60+
"priorities=" + priorities +
61+
'}';
62+
}
63+
}

0 commit comments

Comments
 (0)