@@ -48,43 +48,48 @@ class ReplicationThrottleHelper {
4848 private static final int DEFAULT_RETRY_BACKOFF_BASE = 2 ;
4949 private final AdminClient _adminClient ;
5050 private final Long _throttleRate ;
51+ private final boolean _skipThrottleRateSetting ;
5152 private final int _retries ;
5253 private long _maxDelayMs ;
5354 private final Set <Integer > _deadBrokers ;
5455
55- ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate ) {
56- this (adminClient , throttleRate , RETRIES , MAX_DELAY_MS );
56+ ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate , boolean skipThrottleRateSetting ) {
57+ this (adminClient , throttleRate , skipThrottleRateSetting , RETRIES , MAX_DELAY_MS , new HashSet <>() );
5758 }
5859
59- ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate , Set <Integer > deadBrokers ) {
60- this (adminClient , throttleRate , RETRIES , MAX_DELAY_MS , deadBrokers );
60+ ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate , boolean skipThrottleRateSetting ,
61+ Set <Integer > deadBrokers ) {
62+ this (adminClient , throttleRate , skipThrottleRateSetting , RETRIES , MAX_DELAY_MS , deadBrokers );
6163 }
6264
6365 // for testing
64- ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate , int retries , long maxDelayMs ) {
65- this ._adminClient = adminClient ;
66- this ._throttleRate = throttleRate ;
67- this ._retries = retries ;
68- this ._maxDelayMs = maxDelayMs ;
69- this ._deadBrokers = new HashSet <Integer >();
66+ ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate , boolean skipThrottleRateSetting ,
67+ int retries , long maxDelayMs ) {
68+ this (adminClient , throttleRate , skipThrottleRateSetting , retries , maxDelayMs , new HashSet <>());
7069 }
7170
72- ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate , int retries , long maxDelayMs , Set <Integer > deadBrokers ) {
71+ ReplicationThrottleHelper (AdminClient adminClient , Long throttleRate , boolean skipThrottleRateSetting ,
72+ int retries , long maxDelayMs , Set <Integer > deadBrokers ) {
7373 this ._adminClient = adminClient ;
7474 this ._throttleRate = throttleRate ;
75+ this ._skipThrottleRateSetting = skipThrottleRateSetting ;
7576 this ._retries = retries ;
7677 this ._maxDelayMs = maxDelayMs ;
7778 this ._deadBrokers = deadBrokers ;
7879 }
7980
8081 void setThrottles (List <ExecutionProposal > replicaMovementProposals )
8182 throws ExecutionException , InterruptedException , TimeoutException {
82- if (throttlingEnabled ()) {
83- LOG .info ("Setting a rebalance throttle of {} bytes/sec" , _throttleRate );
84- Set <Integer > participatingBrokers = getParticipatingBrokers (replicaMovementProposals );
83+ if (throttlingEnabled () || _skipThrottleRateSetting ) {
8584 Map <String , Set <String >> throttledReplicas = getThrottledReplicasByTopic (replicaMovementProposals );
86- for (int broker : participatingBrokers ) {
87- setThrottledRateIfNecessary (broker );
85+ if (throttlingEnabled () && !_skipThrottleRateSetting ) {
86+ LOG .info ("Setting a rebalance throttle of {} bytes/sec" , _throttleRate );
87+ Set <Integer > participatingBrokers = getParticipatingBrokers (replicaMovementProposals );
88+ for (int broker : participatingBrokers ) {
89+ setThrottledRateIfNecessary (broker );
90+ }
91+ } else {
92+ LOG .info ("Skipping throttle rate setting; throttled replicas will still be set" );
8893 }
8994 for (Map .Entry <String , Set <String >> entry : throttledReplicas .entrySet ()) {
9095 setThrottledReplicas (entry .getKey (), entry .getValue ());
@@ -112,7 +117,7 @@ boolean taskIsInProgress(ExecutionTask task) {
112117 // clear throttles for a specific list of execution tasks
113118 void clearThrottles (List <ExecutionTask > completedTasks , List <ExecutionTask > inProgressTasks )
114119 throws ExecutionException , InterruptedException , TimeoutException {
115- if (throttlingEnabled ()) {
120+ if (throttlingEnabled () || _skipThrottleRateSetting ) {
116121 List <ExecutionProposal > completedProposals =
117122 completedTasks
118123 .stream ()
@@ -121,29 +126,31 @@ void clearThrottles(List<ExecutionTask> completedTasks, List<ExecutionTask> inPr
121126 .map (ExecutionTask ::proposal )
122127 .collect (Collectors .toList ());
123128
124- // These are the brokers which have completed a task with
125- // inter-broker replica movement
126- Set <Integer > participatingBrokers = getParticipatingBrokers (completedProposals );
127-
128- List <ExecutionProposal > inProgressProposals =
129- inProgressTasks
130- .stream ()
131- .filter (this ::taskIsInProgress )
132- .map (ExecutionTask ::proposal )
133- .collect (Collectors .toList ());
134-
135- // These are the brokers which currently have in-progress
136- // inter-broker replica movement
137- Set <Integer > brokersWithInProgressTasks = getParticipatingBrokers (inProgressProposals );
138-
139- // Remove the brokers with in-progress replica moves from the brokers that have
140- // completed inter-broker replica moves
141- Set <Integer > brokersToRemoveThrottlesFrom = new TreeSet <>(participatingBrokers );
142- brokersToRemoveThrottlesFrom .removeAll (brokersWithInProgressTasks );
143-
144- LOG .info ("Removing replica movement throttles from brokers in the cluster: {}" , brokersToRemoveThrottlesFrom );
145- for (int broker : brokersToRemoveThrottlesFrom ) {
146- removeThrottledRateFromBroker (broker );
129+ if (throttlingEnabled () && !_skipThrottleRateSetting ) {
130+ // These are the brokers which have completed a task with
131+ // inter-broker replica movement
132+ Set <Integer > participatingBrokers = getParticipatingBrokers (completedProposals );
133+
134+ List <ExecutionProposal > inProgressProposals =
135+ inProgressTasks
136+ .stream ()
137+ .filter (this ::taskIsInProgress )
138+ .map (ExecutionTask ::proposal )
139+ .collect (Collectors .toList ());
140+
141+ // These are the brokers which currently have in-progress
142+ // inter-broker replica movement
143+ Set <Integer > brokersWithInProgressTasks = getParticipatingBrokers (inProgressProposals );
144+
145+ // Remove the brokers with in-progress replica moves from the brokers that have
146+ // completed inter-broker replica moves
147+ Set <Integer > brokersToRemoveThrottlesFrom = new TreeSet <>(participatingBrokers );
148+ brokersToRemoveThrottlesFrom .removeAll (brokersWithInProgressTasks );
149+
150+ LOG .info ("Removing replica movement throttles from brokers in the cluster: {}" , brokersToRemoveThrottlesFrom );
151+ for (int broker : brokersToRemoveThrottlesFrom ) {
152+ removeThrottledRateFromBroker (broker );
153+ }
147154 }
148155
149156 Map <String , Set <String >> throttledReplicas = getThrottledReplicasByTopic (completedProposals );
0 commit comments