Skip to content

Commit b67f360

Browse files
committed
[cruise-control] add flag to stop setting broker throttles
1 parent 66e625b commit b67f360

3 files changed

Lines changed: 63 additions & 41 deletions

File tree

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ public final class ExecutorConfig {
9393
public static final String DEFAULT_REPLICATION_THROTTLE_DOC = "The replication throttle applied to replicas being "
9494
+ "moved, in bytes per second.";
9595

96+
/**
97+
* <code>skip.replication.throttle.rate.setting</code>
98+
*/
99+
public static final String SKIP_REPLICATION_THROTTLE_RATE_SETTING_CONFIG = "skip.replication.throttle.rate.setting";
100+
public static final boolean DEFAULT_SKIP_REPLICATION_THROTTLE_RATE_SETTING = false;
101+
public static final String SKIP_REPLICATION_THROTTLE_RATE_SETTING_DOC = "If true, Cruise Control will set throttled "
102+
+ "replicas on topics during rebalances but will not set or remove throttle rates on brokers. This allows an "
103+
+ "external system to manage per-broker throttle rates independently.";
104+
96105
/**
97106
* <code>replica.movement.strategies</code>
98107
*/
@@ -548,6 +557,11 @@ public static ConfigDef define(ConfigDef configDef) {
548557
DEFAULT_DEFAULT_REPLICATION_THROTTLE,
549558
ConfigDef.Importance.MEDIUM,
550559
DEFAULT_REPLICATION_THROTTLE_DOC)
560+
.define(SKIP_REPLICATION_THROTTLE_RATE_SETTING_CONFIG,
561+
ConfigDef.Type.BOOLEAN,
562+
DEFAULT_SKIP_REPLICATION_THROTTLE_RATE_SETTING,
563+
ConfigDef.Importance.MEDIUM,
564+
SKIP_REPLICATION_THROTTLE_RATE_SETTING_DOC)
551565
.define(REPLICA_MOVEMENT_STRATEGIES_CONFIG,
552566
ConfigDef.Type.LIST,
553567
DEFAULT_REPLICA_MOVEMENT_STRATEGIES,

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1606,8 +1606,9 @@ private void updateOngoingExecutionState() {
16061606

16071607
private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
16081608
Set<Integer> currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS);
1609+
boolean skipThrottleRateSetting = _config.getBoolean(ExecutorConfig.SKIP_REPLICATION_THROTTLE_RATE_SETTING_CONFIG);
16091610
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle,
1610-
currentDeadBrokersWithReplicas);
1611+
skipThrottleRateSetting, currentDeadBrokersWithReplicas);
16111612
int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
16121613
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
16131614
long startTime = System.currentTimeMillis();

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -48,43 +48,48 @@ class ReplicationThrottleHelper {
4848
private static final int DEFAULT_RETRY_BACKOFF_BASE = 2;
4949
private final AdminClient _adminClient;
5050
private final Long _throttleRate;
51+
private final boolean _skipThrottleRateSetting;
5152
private final int _retries;
5253
private long _maxDelayMs;
5354
private final Set<Integer> _deadBrokers;
5455

55-
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate) {
56-
this(adminClient, throttleRate, RETRIES, MAX_DELAY_MS);
56+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, boolean skipThrottleRateSetting) {
57+
this(adminClient, throttleRate, skipThrottleRateSetting, RETRIES, MAX_DELAY_MS, new HashSet<>());
5758
}
5859

59-
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, Set<Integer> deadBrokers) {
60-
this(adminClient, throttleRate, RETRIES, MAX_DELAY_MS, deadBrokers);
60+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, boolean skipThrottleRateSetting,
61+
Set<Integer> deadBrokers) {
62+
this(adminClient, throttleRate, skipThrottleRateSetting, RETRIES, MAX_DELAY_MS, deadBrokers);
6163
}
6264

6365
// for testing
64-
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, long maxDelayMs) {
65-
this._adminClient = adminClient;
66-
this._throttleRate = throttleRate;
67-
this._retries = retries;
68-
this._maxDelayMs = maxDelayMs;
69-
this._deadBrokers = new HashSet<Integer>();
66+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, boolean skipThrottleRateSetting,
67+
int retries, long maxDelayMs) {
68+
this(adminClient, throttleRate, skipThrottleRateSetting, retries, maxDelayMs, new HashSet<>());
7069
}
7170

72-
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, long maxDelayMs, Set<Integer> deadBrokers) {
71+
ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, boolean skipThrottleRateSetting,
72+
int retries, long maxDelayMs, Set<Integer> deadBrokers) {
7373
this._adminClient = adminClient;
7474
this._throttleRate = throttleRate;
75+
this._skipThrottleRateSetting = skipThrottleRateSetting;
7576
this._retries = retries;
7677
this._maxDelayMs = maxDelayMs;
7778
this._deadBrokers = deadBrokers;
7879
}
7980

8081
void setThrottles(List<ExecutionProposal> replicaMovementProposals)
8182
throws ExecutionException, InterruptedException, TimeoutException {
82-
if (throttlingEnabled()) {
83-
LOG.info("Setting a rebalance throttle of {} bytes/sec", _throttleRate);
84-
Set<Integer> participatingBrokers = getParticipatingBrokers(replicaMovementProposals);
83+
if (throttlingEnabled() || _skipThrottleRateSetting) {
8584
Map<String, Set<String>> throttledReplicas = getThrottledReplicasByTopic(replicaMovementProposals);
86-
for (int broker : participatingBrokers) {
87-
setThrottledRateIfNecessary(broker);
85+
if (throttlingEnabled() && !_skipThrottleRateSetting) {
86+
LOG.info("Setting a rebalance throttle of {} bytes/sec", _throttleRate);
87+
Set<Integer> participatingBrokers = getParticipatingBrokers(replicaMovementProposals);
88+
for (int broker : participatingBrokers) {
89+
setThrottledRateIfNecessary(broker);
90+
}
91+
} else {
92+
LOG.info("Skipping throttle rate setting; throttled replicas will still be set");
8893
}
8994
for (Map.Entry<String, Set<String>> entry : throttledReplicas.entrySet()) {
9095
setThrottledReplicas(entry.getKey(), entry.getValue());
@@ -112,7 +117,7 @@ boolean taskIsInProgress(ExecutionTask task) {
112117
// clear throttles for a specific list of execution tasks
113118
void clearThrottles(List<ExecutionTask> completedTasks, List<ExecutionTask> inProgressTasks)
114119
throws ExecutionException, InterruptedException, TimeoutException {
115-
if (throttlingEnabled()) {
120+
if (throttlingEnabled() || _skipThrottleRateSetting) {
116121
List<ExecutionProposal> completedProposals =
117122
completedTasks
118123
.stream()
@@ -121,29 +126,31 @@ void clearThrottles(List<ExecutionTask> completedTasks, List<ExecutionTask> inPr
121126
.map(ExecutionTask::proposal)
122127
.collect(Collectors.toList());
123128

124-
// These are the brokers which have completed a task with
125-
// inter-broker replica movement
126-
Set<Integer> participatingBrokers = getParticipatingBrokers(completedProposals);
127-
128-
List<ExecutionProposal> inProgressProposals =
129-
inProgressTasks
130-
.stream()
131-
.filter(this::taskIsInProgress)
132-
.map(ExecutionTask::proposal)
133-
.collect(Collectors.toList());
134-
135-
// These are the brokers which currently have in-progress
136-
// inter-broker replica movement
137-
Set<Integer> brokersWithInProgressTasks = getParticipatingBrokers(inProgressProposals);
138-
139-
// Remove the brokers with in-progress replica moves from the brokers that have
140-
// completed inter-broker replica moves
141-
Set<Integer> brokersToRemoveThrottlesFrom = new TreeSet<>(participatingBrokers);
142-
brokersToRemoveThrottlesFrom.removeAll(brokersWithInProgressTasks);
143-
144-
LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom);
145-
for (int broker : brokersToRemoveThrottlesFrom) {
146-
removeThrottledRateFromBroker(broker);
129+
if (throttlingEnabled() && !_skipThrottleRateSetting) {
130+
// These are the brokers which have completed a task with
131+
// inter-broker replica movement
132+
Set<Integer> participatingBrokers = getParticipatingBrokers(completedProposals);
133+
134+
List<ExecutionProposal> inProgressProposals =
135+
inProgressTasks
136+
.stream()
137+
.filter(this::taskIsInProgress)
138+
.map(ExecutionTask::proposal)
139+
.collect(Collectors.toList());
140+
141+
// These are the brokers which currently have in-progress
142+
// inter-broker replica movement
143+
Set<Integer> brokersWithInProgressTasks = getParticipatingBrokers(inProgressProposals);
144+
145+
// Remove the brokers with in-progress replica moves from the brokers that have
146+
// completed inter-broker replica moves
147+
Set<Integer> brokersToRemoveThrottlesFrom = new TreeSet<>(participatingBrokers);
148+
brokersToRemoveThrottlesFrom.removeAll(brokersWithInProgressTasks);
149+
150+
LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom);
151+
for (int broker : brokersToRemoveThrottlesFrom) {
152+
removeThrottledRateFromBroker(broker);
153+
}
147154
}
148155

149156
Map<String, Set<String>> throttledReplicas = getThrottledReplicasByTopic(completedProposals);

0 commit comments

Comments
 (0)