Skip to content

Commit 059de00

Browse files
committed
Allow coordinator to move segments cognizant of deploymentGroup
1 parent 7d9c627 commit 059de00

22 files changed

Lines changed: 1314 additions & 86 deletions

docs/api-reference/dynamic-configuration-api.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
110110
"cloneServers": {},
111111
"historicalTierAliases": {
112112
"hot": ["hot_1", "hot_2"]
113-
}
113+
},
114+
"coordinatingVersions": []
114115

115116
}
116117
```

docs/configuration/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,7 @@ The following table shows the dynamic configuration properties for the Coordinat
813813
|`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.<br/>Please use this config with caution. All servers should eventually be removed from this list once the segment loading on the respective historicals is finished. |none|
814814
|`cloneServers`| Experimental. Map from target Historical server to source Historical server which should be cloned by the target. The target Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact copy of the source. Segments on the target Historical do not count towards replica counts either. If the source disappears, the target remains in the last known state of the source server until removed from the configuration. <br/>Use this config with caution. All servers should eventually be removed from this list once the desired state on the respective Historicals is achieved. |none|
815815
|`historicalTierAliases`|Map from a virtual tier name to the set of real Historical tier names it expands to. When a load/drop rule references a virtual alias tier, the Coordinator replaces it with its real tiers — each receiving the full replica count independently. The alias key itself is never loaded to directly. For example, `{"hot": ["hot_1", "hot_2"]}` causes a rule of `{"hot": 2}` to load 2 replicas on each of `hot_1` and `hot_2`; `hot` receives no direct assignment. An alias value tier with no servers raises the normal invalid-tier alert. If a rule already specifies an explicit replica count for a tier that also appears as an alias value, the explicit count takes precedence. Duplicate tier names within a set are ignored. A virtual alias tier cannot also be a physical tier.|none|
816+
|`coordinatingVersions`|List of deployment groups for which the Coordinator enforces per-group Historical replication and handoff. When set, load/drop rules are applied independently to each listed group that has active servers in a tier, so each group receives the rule's required replicas. Servers outside the listed groups are not assigned new replicas and may have surplus replicas dropped. Empty disables per-group coordination.|none|
816817

817818
##### Smart segment loading
818819

server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ public class CoordinatorDynamicConfig
8484
*/
8585
private final Map<String, Set<String>> historicalTierAliases;
8686

87+
/**
88+
* Deployment groups the coordinator enforces per-group replication and handoff for.
89+
* When non-empty, the coordinator ensures {@code requiredReplicas} per listed group rather than
90+
* per tier, and the handoff endpoint requires at least one server in each listed group (that has
91+
* online servers) to serve the segment before declaring handoff complete.
92+
* An empty set disables this behavior and restores default tier-wide replication.
93+
*/
94+
private final Set<String> coordinatingVersions;
95+
8796
/**
8897
* Stale pending segments belonging to the data sources in this list are not killed by {@code
8998
* KillStalePendingSegments}. In other words, segments in these data sources are "protected".
@@ -135,7 +144,8 @@ public CoordinatorDynamicConfig(
135144
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
136145
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
137146
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers,
138-
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases
147+
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases,
148+
@JsonProperty("coordinatingVersions") @Nullable Set<String> coordinatingVersions
139149
)
140150
{
141151
this.markSegmentAsUnusedDelayMillis =
@@ -183,6 +193,7 @@ public CoordinatorDynamicConfig(
183193
this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of());
184194

185195
this.historicalTierAliases = Configs.valueOrDefault(historicalTierAliases, Map.of());
196+
this.coordinatingVersions = Configs.valueOrDefault(coordinatingVersions, Set.of());
186197
final Set<String> aliasKeys = this.historicalTierAliases.keySet();
187198
for (Set<String> mappedTiers : this.historicalTierAliases.values()) {
188199
if (!Sets.intersection(mappedTiers, aliasKeys).isEmpty()) {
@@ -364,6 +375,12 @@ public Map<String, Set<String>> getHistoricalTierAliases()
364375
return historicalTierAliases;
365376
}
366377

378+
@JsonProperty
379+
public Set<String> getCoordinatingVersions()
380+
{
381+
return coordinatingVersions;
382+
}
383+
367384
/**
368385
* List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load
369386
* segments. This causes decreases the average time taken to load segments. However, this also means less resources
@@ -398,6 +415,7 @@ public String toString()
398415
", turboLoadingNodes=" + turboLoadingNodes +
399416
", cloneServers=" + cloneServers +
400417
", historicalTierAliases=" + historicalTierAliases +
418+
", coordinatingVersions=" + coordinatingVersions +
401419
'}';
402420
}
403421

@@ -435,7 +453,8 @@ public boolean equals(Object o)
435453
&& Objects.equals(turboLoadingNodes, that.turboLoadingNodes)
436454
&& Objects.equals(debugDimensions, that.debugDimensions)
437455
&& Objects.equals(cloneServers, that.cloneServers)
438-
&& Objects.equals(historicalTierAliases, that.historicalTierAliases);
456+
&& Objects.equals(historicalTierAliases, that.historicalTierAliases)
457+
&& Objects.equals(coordinatingVersions, that.coordinatingVersions);
439458
}
440459

441460
@Override
@@ -460,7 +479,8 @@ public int hashCode()
460479
debugDimensions,
461480
turboLoadingNodes,
462481
cloneServers,
463-
historicalTierAliases
482+
historicalTierAliases,
483+
coordinatingVersions
464484
);
465485
}
466486

@@ -518,6 +538,7 @@ public static class Builder
518538
private Set<String> turboLoadingNodes;
519539
private Map<String, String> cloneServers;
520540
private Map<String, Set<String>> historicalTierAliases;
541+
private Set<String> coordinatingVersions;
521542

522543
public Builder()
523544
{
@@ -543,7 +564,8 @@ public Builder(
543564
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
544565
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
545566
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers,
546-
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases
567+
@JsonProperty("historicalTierAliases") @Nullable Map<String, Set<String>> historicalTierAliases,
568+
@JsonProperty("coordinatingVersions") @Nullable Set<String> coordinatingVersions
547569
)
548570
{
549571
this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis;
@@ -565,6 +587,7 @@ public Builder(
565587
this.turboLoadingNodes = turboLoadingNodes;
566588
this.cloneServers = cloneServers;
567589
this.historicalTierAliases = historicalTierAliases;
590+
this.coordinatingVersions = coordinatingVersions;
568591
}
569592

570593
public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis)
@@ -669,6 +692,12 @@ public Builder withHistoricalTierAliases(Map<String, Set<String>> historicalTier
669692
return this;
670693
}
671694

695+
public Builder withCoordinatingVersions(Set<String> coordinatingVersions)
696+
{
697+
this.coordinatingVersions = coordinatingVersions;
698+
return this;
699+
}
700+
672701
/**
673702
* Builds a CoordinatoryDynamicConfig using either the configured values, or
674703
* the default value if not configured.
@@ -697,7 +726,8 @@ public CoordinatorDynamicConfig build()
697726
debugDimensions,
698727
turboLoadingNodes,
699728
cloneServers,
700-
historicalTierAliases
729+
historicalTierAliases,
730+
coordinatingVersions
701731
);
702732
}
703733

@@ -730,7 +760,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
730760
valueOrDefault(debugDimensions, defaults.getDebugDimensions()),
731761
valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()),
732762
valueOrDefault(cloneServers, defaults.getCloneServers()),
733-
valueOrDefault(historicalTierAliases, defaults.getHistoricalTierAliases())
763+
valueOrDefault(historicalTierAliases, defaults.getHistoricalTierAliases()),
764+
valueOrDefault(coordinatingVersions, defaults.getCoordinatingVersions())
734765
);
735766
}
736767
}

server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.NavigableSet;
3535
import java.util.Set;
36+
import java.util.TreeSet;
3637
import java.util.stream.Collectors;
3738

3839
/**
@@ -46,6 +47,11 @@ public class DruidCluster
4647
private final Set<ServerHolder> realtimes;
4748
private final Map<String, NavigableSet<ServerHolder>> historicals;
4849
private final Map<String, NavigableSet<ServerHolder>> managedHistoricals;
50+
/**
51+
* Managed historicals indexed by tier and then by deployment group. Only servers with a non-null
52+
* deploymentGroup appear here. Used to support per-group replication and handoff enforcement.
53+
*/
54+
private final Map<String, Map<String, NavigableSet<ServerHolder>>> managedHistoricalsByTierAndGroup;
4955
private final Set<ServerHolder> brokers;
5056
private final List<ServerHolder> allManagedServers;
5157

@@ -70,6 +76,7 @@ private DruidCluster(
7076
return CollectionUtils.newTreeSet(Comparator.naturalOrder(), managedServers);
7177
}
7278
);
79+
this.managedHistoricalsByTierAndGroup = initManagedHistoricalsByTierAndGroup();
7380
this.brokers = Collections.unmodifiableSet(brokers);
7481
this.allManagedServers = initAllManagedServers();
7582
}
@@ -111,6 +118,73 @@ public NavigableSet<ServerHolder> getManagedHistoricalsByTier(String tier)
111118
return managedHistoricals.get(tier);
112119
}
113120

121+
/**
122+
* Returns the distinct non-null deployment groups present among managed historicals in the given tier.
123+
*/
124+
public Set<String> getDeploymentGroupsForTier(String tier)
125+
{
126+
final Map<String, NavigableSet<ServerHolder>> groupMap = managedHistoricalsByTierAndGroup.get(tier);
127+
return groupMap == null ? Collections.emptySet() : groupMap.keySet();
128+
}
129+
130+
/**
131+
* Returns managed historicals in the given tier that belong to the given deployment group.
132+
* Returns an empty set if no servers for that (tier, group) pair exist.
133+
*/
134+
public NavigableSet<ServerHolder> getManagedHistoricalsByTierAndGroup(String tier, String group)
135+
{
136+
final Map<String, NavigableSet<ServerHolder>> groupMap = managedHistoricalsByTierAndGroup.get(tier);
137+
if (groupMap == null) {
138+
return Collections.emptyNavigableSet();
139+
}
140+
final NavigableSet<ServerHolder> servers = groupMap.get(group);
141+
return servers == null ? Collections.emptyNavigableSet() : servers;
142+
}
143+
144+
/**
145+
* Returns managed historicals in the given tier whose {@code deploymentGroup} is null or is not
146+
* present in {@code coordinatingVersions}. These are the "uncoordinated" servers whose replica
147+
* counts roll up to the tier-wide {@link ReplicaCountKey}; they must still be visited by drop /
148+
* cancellation passes when the rest of the tier is operating in per-group mode.
149+
*/
150+
public NavigableSet<ServerHolder> getUncoordinatedManagedHistoricalsByTier(
151+
String tier,
152+
Set<String> coordinatingVersions
153+
)
154+
{
155+
final NavigableSet<ServerHolder> all = managedHistoricals.get(tier);
156+
if (all == null || all.isEmpty()) {
157+
return Collections.emptyNavigableSet();
158+
}
159+
if (coordinatingVersions == null || coordinatingVersions.isEmpty()) {
160+
return all;
161+
}
162+
final NavigableSet<ServerHolder> filtered = new TreeSet<>(Comparator.naturalOrder());
163+
for (ServerHolder server : all) {
164+
final String group = server.getServer().getMetadata().getDeploymentGroup();
165+
if (group == null || !coordinatingVersions.contains(group)) {
166+
filtered.add(server);
167+
}
168+
}
169+
return filtered;
170+
}
171+
172+
private Map<String, Map<String, NavigableSet<ServerHolder>>> initManagedHistoricalsByTierAndGroup()
173+
{
174+
final Map<String, Map<String, NavigableSet<ServerHolder>>> result = new HashMap<>();
175+
managedHistoricals.forEach((tier, servers) -> {
176+
for (ServerHolder server : servers) {
177+
final String group = server.getServer().getMetadata().getDeploymentGroup();
178+
if (group != null) {
179+
result.computeIfAbsent(tier, t -> new HashMap<>())
180+
.computeIfAbsent(group, g -> new TreeSet<>(Comparator.naturalOrder()))
181+
.add(server);
182+
}
183+
}
184+
});
185+
return Collections.unmodifiableMap(result);
186+
}
187+
114188
public List<ServerHolder> getAllManagedServers()
115189
{
116190
return allManagedServers;

server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import org.apache.druid.server.coordinator.stats.Stats;
3232
import org.joda.time.Duration;
3333

34+
import java.util.LinkedHashMap;
35+
import java.util.LinkedHashSet;
36+
import java.util.Map;
3437
import java.util.Set;
3538

3639
/**
@@ -60,9 +63,27 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
6063
return params;
6164
}
6265

63-
params.getDruidCluster().getManagedHistoricals().forEach(
64-
(tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run()
65-
);
66+
final Set<String> coordinatingVersions = params.getCoordinatorDynamicConfig().getCoordinatingVersions();
67+
params.getDruidCluster().getManagedHistoricals().forEach((tier, servers) -> {
68+
if (coordinatingVersions.isEmpty()) {
69+
new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run();
70+
} else {
71+
// Partition tier servers by deployment group so segments never move across groups.
72+
// Servers with no deploymentGroup form their own partition keyed under the empty string.
73+
final Map<String, Set<ServerHolder>> serversByGroup = partitionByDeploymentGroup(servers);
74+
int remainingGroups = serversByGroup.size();
75+
int remainingSegmentsToMove = maxSegmentsToMove;
76+
for (final Set<ServerHolder> groupServers : serversByGroup.values()) {
77+
final int groupMaxSegmentsToMove =
78+
(remainingSegmentsToMove + remainingGroups - 1) / remainingGroups;
79+
if (groupMaxSegmentsToMove > 0) {
80+
new TierSegmentBalancer(tier, groupServers, groupMaxSegmentsToMove, params).run();
81+
remainingSegmentsToMove -= groupMaxSegmentsToMove;
82+
}
83+
--remainingGroups;
84+
}
85+
}
86+
});
6687

6788
CoordinatorRunStats runStats = params.getCoordinatorStats();
6889
params.getBalancerStrategy()
@@ -123,4 +144,13 @@ private Pair<Integer, Integer> getNumHistoricalsAndSegments(DruidCluster cluster
123144
return Pair.of(numHistoricals, numSegments);
124145
}
125146

147+
private static Map<String, Set<ServerHolder>> partitionByDeploymentGroup(Set<ServerHolder> servers)
148+
{
149+
final Map<String, Set<ServerHolder>> byGroup = new LinkedHashMap<>();
150+
for (final ServerHolder server : servers) {
151+
final String group = server.getServer().getMetadata().getDeploymentGroup();
152+
byGroup.computeIfAbsent(group == null ? "" : group, g -> new LinkedHashSet<>()).add(server);
153+
}
154+
return byGroup;
155+
}
126156
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.coordinator.loading;
21+
22+
import javax.annotation.Nullable;
23+
import java.util.Set;
24+
25+
/**
26+
* Map key used by {@link SegmentReplicaCountMap}. A null {@code group} represents tier-wide
27+
* tracking; a non-null {@code group} represents a specific deployment group within the tier
28+
* (used when the coordinator is enforcing per-group replication via {@code coordinatingVersions}).
29+
*/
30+
public record ReplicaCountKey(String tier, @Nullable String group)
31+
{
32+
public static ReplicaCountKey forTier(String tier)
33+
{
34+
return new ReplicaCountKey(tier, null);
35+
}
36+
37+
/**
38+
* Returns a (tier, group) key when {@code group} is non-null and present in
39+
* {@code coordinatingVersions}; otherwise a plain tier-wide key.
40+
*/
41+
public static ReplicaCountKey from(String tier, @Nullable String group, Set<String> coordinatingVersions)
42+
{
43+
if (group != null && coordinatingVersions.contains(group)) {
44+
return new ReplicaCountKey(tier, group);
45+
}
46+
return new ReplicaCountKey(tier, null);
47+
}
48+
}

0 commit comments

Comments
 (0)