Skip to content

Commit 95204a9

Browse files
committed
Allow coordinator to move segments cognizant of deploymentGroup
1 parent 7d9c627 commit 95204a9

19 files changed

Lines changed: 1210 additions & 70 deletions

server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ public class CoordinatorDynamicConfig
8484
*/
8585
private final Map<String, Set<String>> historicalTierAliases;
8686

87+
/**
88+
* Deployment groups the coordinator enforces per-group replication and handoff for.
89+
* When non-empty, the coordinator ensures {@code requiredReplicas} per listed group rather than
90+
* per tier, and the handoff endpoint requires at least one server in each listed group (that has
91+
* online servers) to serve the segment before declaring handoff complete.
92+
* An empty set disables this behavior and restores default tier-wide replication.
93+
*/
94+
private final Set<String> coordinatingVersions;
95+
8796
/**
8897
* Stale pending segments belonging to the data sources in this list are not killed by {@code
8998
* KillStalePendingSegments}. In other words, segments in these data sources are "protected".
@@ -135,7 +144,8 @@ public CoordinatorDynamicConfig(
135144
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
136145
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
137146
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers,
138-
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases
147+
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases,
148+
@JsonProperty("coordinatingVersions") @Nullable Set<String> coordinatingVersions
139149
)
140150
{
141151
this.markSegmentAsUnusedDelayMillis =
@@ -183,6 +193,7 @@ public CoordinatorDynamicConfig(
183193
this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of());
184194

185195
this.historicalTierAliases = Configs.valueOrDefault(historicalTierAliases, Map.of());
196+
this.coordinatingVersions = Configs.valueOrDefault(coordinatingVersions, Set.of());
186197
final Set<String> aliasKeys = this.historicalTierAliases.keySet();
187198
for (Set<String> mappedTiers : this.historicalTierAliases.values()) {
188199
if (!Sets.intersection(mappedTiers, aliasKeys).isEmpty()) {
@@ -364,6 +375,12 @@ public Map<String, Set<String>> getHistoricalTierAliases()
364375
return historicalTierAliases;
365376
}
366377

378+
@JsonProperty
379+
public Set<String> getCoordinatingVersions()
380+
{
381+
return coordinatingVersions;
382+
}
383+
367384
/**
368385
* List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load
369386
* segments. This causes decreases the average time taken to load segments. However, this also means less resources
@@ -398,6 +415,7 @@ public String toString()
398415
", turboLoadingNodes=" + turboLoadingNodes +
399416
", cloneServers=" + cloneServers +
400417
", historicalTierAliases=" + historicalTierAliases +
418+
", coordinatingVersions=" + coordinatingVersions +
401419
'}';
402420
}
403421

@@ -435,7 +453,8 @@ public boolean equals(Object o)
435453
&& Objects.equals(turboLoadingNodes, that.turboLoadingNodes)
436454
&& Objects.equals(debugDimensions, that.debugDimensions)
437455
&& Objects.equals(cloneServers, that.cloneServers)
438-
&& Objects.equals(historicalTierAliases, that.historicalTierAliases);
456+
&& Objects.equals(historicalTierAliases, that.historicalTierAliases)
457+
&& Objects.equals(coordinatingVersions, that.coordinatingVersions);
439458
}
440459

441460
@Override
@@ -460,7 +479,8 @@ public int hashCode()
460479
debugDimensions,
461480
turboLoadingNodes,
462481
cloneServers,
463-
historicalTierAliases
482+
historicalTierAliases,
483+
coordinatingVersions
464484
);
465485
}
466486

@@ -518,6 +538,7 @@ public static class Builder
518538
private Set<String> turboLoadingNodes;
519539
private Map<String, String> cloneServers;
520540
private Map<String, Set<String>> historicalTierAliases;
541+
private Set<String> coordinatingVersions;
521542

522543
public Builder()
523544
{
@@ -543,7 +564,8 @@ public Builder(
543564
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
544565
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
545566
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers,
546-
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases
567+
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases,
568+
@JsonProperty("coordinatingVersions") @Nullable Set<String> coordinatingVersions
547569
)
548570
{
549571
this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis;
@@ -565,6 +587,7 @@ public Builder(
565587
this.turboLoadingNodes = turboLoadingNodes;
566588
this.cloneServers = cloneServers;
567589
this.historicalTierAliases = historicalTierAliases;
590+
this.coordinatingVersions = coordinatingVersions;
568591
}
569592

570593
public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis)
@@ -669,6 +692,12 @@ public Builder withHistoricalTierAliases(Map<String, Set<String>> historicalTier
669692
return this;
670693
}
671694

695+
public Builder withCoordinatingVersions(Set<String> coordinatingVersions)
696+
{
697+
this.coordinatingVersions = coordinatingVersions;
698+
return this;
699+
}
700+
672701
/**
673702
* Builds a CoordinatoryDynamicConfig using either the configured values, or
674703
* the default value if not configured.
@@ -697,7 +726,8 @@ public CoordinatorDynamicConfig build()
697726
debugDimensions,
698727
turboLoadingNodes,
699728
cloneServers,
700-
historicalTierAliases
729+
historicalTierAliases,
730+
coordinatingVersions
701731
);
702732
}
703733

@@ -730,7 +760,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
730760
valueOrDefault(debugDimensions, defaults.getDebugDimensions()),
731761
valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()),
732762
valueOrDefault(cloneServers, defaults.getCloneServers()),
733-
valueOrDefault(historicalTierAliases, defaults.getHistoricalTierAliases())
763+
valueOrDefault(historicalTierAliases, defaults.getHistoricalTierAliases()),
764+
valueOrDefault(coordinatingVersions, defaults.getCoordinatingVersions())
734765
);
735766
}
736767
}

server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.NavigableSet;
3535
import java.util.Set;
36+
import java.util.TreeSet;
3637
import java.util.stream.Collectors;
3738

3839
/**
@@ -46,6 +47,11 @@ public class DruidCluster
4647
private final Set<ServerHolder> realtimes;
4748
private final Map<String, NavigableSet<ServerHolder>> historicals;
4849
private final Map<String, NavigableSet<ServerHolder>> managedHistoricals;
50+
/**
51+
* Managed historicals indexed by tier and then by deployment group. Only servers with a non-null
52+
* deploymentGroup appear here. Used to support per-group replication and handoff enforcement.
53+
*/
54+
private final Map<String, Map<String, NavigableSet<ServerHolder>>> managedHistoricalsByTierAndGroup;
4955
private final Set<ServerHolder> brokers;
5056
private final List<ServerHolder> allManagedServers;
5157

@@ -70,6 +76,7 @@ private DruidCluster(
7076
return CollectionUtils.newTreeSet(Comparator.naturalOrder(), managedServers);
7177
}
7278
);
79+
this.managedHistoricalsByTierAndGroup = initManagedHistoricalsByTierAndGroup();
7380
this.brokers = Collections.unmodifiableSet(brokers);
7481
this.allManagedServers = initAllManagedServers();
7582
}
@@ -111,6 +118,73 @@ public NavigableSet<ServerHolder> getManagedHistoricalsByTier(String tier)
111118
return managedHistoricals.get(tier);
112119
}
113120

121+
/**
122+
* Returns the distinct non-null deployment groups present among managed historicals in the given tier.
123+
*/
124+
public Set<String> getDeploymentGroupsForTier(String tier)
125+
{
126+
final Map<String, NavigableSet<ServerHolder>> groupMap = managedHistoricalsByTierAndGroup.get(tier);
127+
return groupMap == null ? Collections.emptySet() : groupMap.keySet();
128+
}
129+
130+
/**
131+
* Returns managed historicals in the given tier that belong to the given deployment group.
132+
* Returns an empty set if no servers for that (tier, group) pair exist.
133+
*/
134+
public NavigableSet<ServerHolder> getManagedHistoricalsByTierAndGroup(String tier, String group)
135+
{
136+
final Map<String, NavigableSet<ServerHolder>> groupMap = managedHistoricalsByTierAndGroup.get(tier);
137+
if (groupMap == null) {
138+
return Collections.emptyNavigableSet();
139+
}
140+
final NavigableSet<ServerHolder> servers = groupMap.get(group);
141+
return servers == null ? Collections.emptyNavigableSet() : servers;
142+
}
143+
144+
/**
145+
* Returns managed historicals in the given tier whose {@code deploymentGroup} is null or is not
146+
* present in {@code coordinatingVersions}. These are the "uncoordinated" servers whose replica
147+
* counts roll up to the tier-wide {@link ReplicaCountKey}; they must still be visited by drop /
148+
* cancellation passes when the rest of the tier is operating in per-group mode.
149+
*/
150+
public NavigableSet<ServerHolder> getUncoordinatedManagedHistoricalsByTier(
151+
String tier,
152+
Set<String> coordinatingVersions
153+
)
154+
{
155+
final NavigableSet<ServerHolder> all = managedHistoricals.get(tier);
156+
if (all == null || all.isEmpty()) {
157+
return Collections.emptyNavigableSet();
158+
}
159+
if (coordinatingVersions == null || coordinatingVersions.isEmpty()) {
160+
return all;
161+
}
162+
final NavigableSet<ServerHolder> filtered = new TreeSet<>(Comparator.naturalOrder());
163+
for (ServerHolder server : all) {
164+
final String group = server.getServer().getMetadata().getDeploymentGroup();
165+
if (group == null || !coordinatingVersions.contains(group)) {
166+
filtered.add(server);
167+
}
168+
}
169+
return filtered;
170+
}
171+
172+
private Map<String, Map<String, NavigableSet<ServerHolder>>> initManagedHistoricalsByTierAndGroup()
173+
{
174+
final Map<String, Map<String, NavigableSet<ServerHolder>>> result = new HashMap<>();
175+
managedHistoricals.forEach((tier, servers) -> {
176+
for (ServerHolder server : servers) {
177+
final String group = server.getServer().getMetadata().getDeploymentGroup();
178+
if (group != null) {
179+
result.computeIfAbsent(tier, t -> new HashMap<>())
180+
.computeIfAbsent(group, g -> new TreeSet<>(Comparator.naturalOrder()))
181+
.add(server);
182+
}
183+
}
184+
});
185+
return Collections.unmodifiableMap(result);
186+
}
187+
114188
public List<ServerHolder> getAllManagedServers()
115189
{
116190
return allManagedServers;

server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import org.apache.druid.server.coordinator.stats.Stats;
3232
import org.joda.time.Duration;
3333

34+
import java.util.HashMap;
35+
import java.util.HashSet;
36+
import java.util.Map;
3437
import java.util.Set;
3538

3639
/**
@@ -60,9 +63,20 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
6063
return params;
6164
}
6265

63-
params.getDruidCluster().getManagedHistoricals().forEach(
64-
(tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run()
65-
);
66+
final Set<String> coordinatingVersions = params.getCoordinatorDynamicConfig().getCoordinatingVersions();
67+
params.getDruidCluster().getManagedHistoricals().forEach((tier, servers) -> {
68+
if (coordinatingVersions.isEmpty()) {
69+
new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run();
70+
} else {
71+
// Partition tier servers by deployment group so segments never move across groups.
72+
// Servers with no deploymentGroup form their own partition keyed under the empty string.
73+
final Map<String, Set<ServerHolder>> serversByGroup = partitionByDeploymentGroup(servers);
74+
final int perGroupMax = Math.max(1, maxSegmentsToMove / serversByGroup.size());
75+
serversByGroup.forEach(
76+
(group, groupServers) -> new TierSegmentBalancer(tier, groupServers, perGroupMax, params).run()
77+
);
78+
}
79+
});
6680

6781
CoordinatorRunStats runStats = params.getCoordinatorStats();
6882
params.getBalancerStrategy()
@@ -123,4 +137,13 @@ private Pair<Integer, Integer> getNumHistoricalsAndSegments(DruidCluster cluster
123137
return Pair.of(numHistoricals, numSegments);
124138
}
125139

140+
private static Map<String, Set<ServerHolder>> partitionByDeploymentGroup(Set<ServerHolder> servers)
141+
{
142+
final Map<String, Set<ServerHolder>> byGroup = new HashMap<>();
143+
for (ServerHolder server : servers) {
144+
final String group = server.getServer().getMetadata().getDeploymentGroup();
145+
byGroup.computeIfAbsent(group == null ? "" : group, g -> new HashSet<>()).add(server);
146+
}
147+
return byGroup;
148+
}
126149
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.coordinator.loading;
21+
22+
import javax.annotation.Nullable;
23+
import java.util.Set;
24+
25+
/**
26+
* Map key used by {@link SegmentReplicaCountMap}. A null {@code group} represents tier-wide
27+
* tracking; a non-null {@code group} represents a specific deployment group within the tier
28+
* (used when the coordinator is enforcing per-group replication via {@code coordinatingVersions}).
29+
*/
30+
public record ReplicaCountKey(String tier, @Nullable String group)
31+
{
32+
public static ReplicaCountKey forTier(String tier)
33+
{
34+
return new ReplicaCountKey(tier, null);
35+
}
36+
37+
/**
38+
* Returns a (tier, group) key when {@code group} is non-null and present in
39+
* {@code coordinatingVersions}; otherwise a plain tier-wide key.
40+
*/
41+
public static ReplicaCountKey from(String tier, @Nullable String group, Set<String> coordinatingVersions)
42+
{
43+
if (group != null && coordinatingVersions.contains(group)) {
44+
return new ReplicaCountKey(tier, group);
45+
}
46+
return new ReplicaCountKey(tier, null);
47+
}
48+
}

server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,25 @@
4848
public class RoundRobinServerSelector
4949
{
5050
private final Map<String, CircularServerList> tierToServers = new HashMap<>();
51+
private final Map<String, Map<String, CircularServerList>> tierGroupToServers = new HashMap<>();
5152

52-
public RoundRobinServerSelector(DruidCluster cluster)
53+
public RoundRobinServerSelector(DruidCluster cluster, Set<String> coordinatingVersions)
5354
{
5455
cluster.getManagedHistoricals().forEach(
5556
(tier, servers) -> tierToServers.put(tier, new CircularServerList(servers))
5657
);
58+
59+
if (!coordinatingVersions.isEmpty()) {
60+
cluster.getManagedHistoricals().keySet().forEach(tier -> {
61+
for (String group : coordinatingVersions) {
62+
final var groupServers = cluster.getManagedHistoricalsByTierAndGroup(tier, group);
63+
if (!groupServers.isEmpty()) {
64+
tierGroupToServers.computeIfAbsent(tier, t -> new HashMap<>())
65+
.put(group, new CircularServerList(groupServers));
66+
}
67+
}
68+
});
69+
}
5770
}
5871

5972
/**
@@ -70,6 +83,20 @@ public Iterator<ServerHolder> getServersInTierToLoadSegment(String tier, DataSeg
7083
return new EligibleServerIterator(segment, iterator);
7184
}
7285

86+
/**
87+
* Returns an iterator over servers in the given tier and deployment group that are eligible to
88+
* load the given segment.
89+
*/
90+
public Iterator<ServerHolder> getServersInTierAndGroupToLoadSegment(String tier, String group, DataSegment segment)
91+
{
92+
final Map<String, CircularServerList> groupMap = tierGroupToServers.get(tier);
93+
if (groupMap == null) {
94+
return Collections.emptyIterator();
95+
}
96+
final CircularServerList list = groupMap.get(group);
97+
return list == null ? Collections.emptyIterator() : new EligibleServerIterator(segment, list);
98+
}
99+
73100
/**
74101
* Iterator over servers in a tier that are eligible to load a given segment.
75102
*/

0 commit comments

Comments
 (0)