diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index da8a28e28..4f5e6dee1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -9,7 +9,9 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +51,7 @@ class ReplicationThrottleHelper { private final Set _deadBrokers; ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate) { - this(adminClient, throttleRate, RETRIES); + this(adminClient, throttleRate, RETRIES, new HashSet<>()); } ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, Set deadBrokers) { @@ -58,10 +60,7 @@ class ReplicationThrottleHelper { // for testing ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries) { - this._adminClient = adminClient; - this._throttleRate = throttleRate; - this._retries = retries; - this._deadBrokers = new HashSet(); + this(adminClient, throttleRate, retries, new HashSet<>()); } ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, Set deadBrokers) { @@ -77,12 +76,9 @@ void setThrottles(List replicaMovementProposals) LOG.info("Setting a rebalance throttle of {} bytes/sec", _throttleRate); Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); Map> throttledReplicas = getThrottledReplicasByTopic(replicaMovementProposals); - for (int broker : participatingBrokers) { - setThrottledRateIfNecessary(broker); - } - for (Map.Entry> entry : throttledReplicas.entrySet()) { - setThrottledReplicas(entry.getKey(), entry.getValue()); - } + + setThrottledRateIfNecessary(participatingBrokers); + setThrottledReplicas(throttledReplicas); } } @@ -136,14 +132,10 @@ void clearThrottles(List completedTasks, List inPr brokersToRemoveThrottlesFrom.removeAll(brokersWithInProgressTasks); LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom); - for (int broker : brokersToRemoveThrottlesFrom) { - removeThrottledRateFromBroker(broker); - } + removeThrottledRateFromBrokers(brokersToRemoveThrottlesFrom); Map> throttledReplicas = getThrottledReplicasByTopic(completedProposals); - for (Map.Entry> entry : throttledReplicas.entrySet()) { - removeThrottledReplicasFromTopic(entry.getKey(), entry.getValue()); - } + removeThrottledReplicasFromTopics(throttledReplicas); } } @@ -176,24 +168,6 @@ private Map> getThrottledReplicasByTopic(List ops = new ArrayList<>(); - for (String replicaThrottleRateConfigKey : Arrays.asList(LEADER_REPLICATION_THROTTLED_RATE_CONFIG, FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG)) { - ConfigEntry currThrottleRate = brokerConfigs.get(replicaThrottleRateConfigKey); - if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(_throttleRate))) { - LOG.debug("Setting {} to {} bytes/second for broker {}", replicaThrottleRateConfigKey, _throttleRate, brokerId); - ops.add(new AlterConfigOp(new ConfigEntry(replicaThrottleRateConfigKey, String.valueOf(_throttleRate)), AlterConfigOp.OpType.SET)); - } - } - if (!ops.isEmpty()) { - changeBrokerConfigs(brokerId, ops); - } - } - private Config getTopicConfigs(String topic) throws ExecutionException, InterruptedException, TimeoutException { try { return getEntityConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic)); @@ -205,8 +179,18 @@ private Config getTopicConfigs(String topic) throws ExecutionException, Interrup } } - private Config getBrokerConfigs(int brokerId) throws ExecutionException, InterruptedException, TimeoutException { - return getEntityConfigs(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId))); + private Map getTopicConfigs(Set topics) + throws ExecutionException, InterruptedException, TimeoutException { + Map result = new HashMap<>(); + if (topics == null || topics.isEmpty()) { + return result; + } + for (String topic : topics) { + ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); + Config cfg = getTopicConfigs(topic); + result.put(cf, cfg == null ? new Config(Collections.emptyList()) : cfg); + } + return result; } private Config getEntityConfigs(ConfigResource cf) throws ExecutionException, InterruptedException, TimeoutException { @@ -215,54 +199,69 @@ private Config getEntityConfigs(ConfigResource cf) throws ExecutionException, In return configs.get(cf); } - private void setThrottledReplicas(String topic, Set replicas) - throws ExecutionException, InterruptedException, TimeoutException { - Config topicConfigs = getTopicConfigs(topic); - List ops = new ArrayList<>(); - for (String replicaThrottleConfigKey : Arrays.asList(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, - FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG)) { - ConfigEntry currThrottledReplicas = topicConfigs.get(replicaThrottleConfigKey); - if (currThrottledReplicas != null && currThrottledReplicas.value().trim().equals(WILDCARD_ASTERISK)) { - // The existing setup throttles all replica. So, nothing needs to be changed. - continue; + private void setThrottledRateIfNecessary(Set brokerIds) throws ExecutionException, InterruptedException, TimeoutException { + if (brokerIds == null || brokerIds.isEmpty()) { + LOG.debug("Skipping setting replication throttled rate; no target brokers to update. Throttle rate: {}", _throttleRate); + return; + } + Map> bulkOps = new HashMap<>(); + for (Map.Entry entry : getBrokerConfigs(brokerIds).entrySet()) { + ConfigResource cf = entry.getKey(); + Config brokerConfigs = entry.getValue(); + if (brokerConfigs == null) { + brokerConfigs = new Config(Collections.emptyList()); } - - // Merge new throttled replicas with existing configuration values. - Set newThrottledReplicas = new TreeSet<>(replicas); - if (currThrottledReplicas != null && !currThrottledReplicas.value().equals("")) { - newThrottledReplicas.addAll(Arrays.asList(currThrottledReplicas.value().split(","))); + List ops = new ArrayList<>(); + for (String replicaThrottleRateConfigKey : List.of(LEADER_REPLICATION_THROTTLED_RATE_CONFIG, FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG)) { + ConfigEntry currThrottleRate = brokerConfigs.get(replicaThrottleRateConfigKey); + if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(_throttleRate))) { + LOG.debug("Setting {} to {} bytes/second for broker {}", replicaThrottleRateConfigKey, _throttleRate, cf.name()); + ops.add(new AlterConfigOp(new ConfigEntry(replicaThrottleRateConfigKey, String.valueOf(_throttleRate)), AlterConfigOp.OpType.SET)); + } + } + if (!ops.isEmpty()) { + bulkOps.put(cf, ops); } - ops.add(new AlterConfigOp(new ConfigEntry(replicaThrottleConfigKey, String.join(",", newThrottledReplicas)), AlterConfigOp.OpType.SET)); - } - if (!ops.isEmpty()) { - changeTopicConfigs(topic, ops); } + changeBrokerConfigs(bulkOps); } - void changeTopicConfigs(String topic, Collection ops) - throws ExecutionException, InterruptedException, TimeoutException { - ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); - Map> configs = Collections.singletonMap(cf, ops); - try { - _adminClient.incrementalAlterConfigs(configs).all() - .get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); - waitForConfigs(cf, ops); - } catch (Exception e) { - if (!topicExists(topic)) { - LOG.debug("Failed to change configs for topic {} since it does not exist", topic); - return; + private void setThrottledReplicas(Map> replicasByTopic) + throws ExecutionException, InterruptedException, TimeoutException { + if (replicasByTopic == null || replicasByTopic.isEmpty()) { + return; + } + Map> bulkOps = new HashMap<>(); + Map topicConfigs = getTopicConfigs(replicasByTopic.keySet()); + for (Map.Entry cfgEntry : topicConfigs.entrySet()) { + ConfigResource cf = cfgEntry.getKey(); + String topic = cf.name(); + Set replicas = replicasByTopic.get(topic); + if (replicas == null || replicas.isEmpty()) { + continue; + } + Config topicConfig = cfgEntry.getValue(); + if (topicConfig == null) { + topicConfig = new Config(Collections.emptyList()); + } + List ops = new ArrayList<>(); + for (String replicaThrottleConfigKey : List.of(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG)) { + ConfigEntry currThrottledReplicas = topicConfig.get(replicaThrottleConfigKey); + if (currThrottledReplicas != null && currThrottledReplicas.value().trim().equals(WILDCARD_ASTERISK)) { + continue; + } + Set newThrottledReplicas = new TreeSet<>(replicas); + if (currThrottledReplicas != null && !"".equals(currThrottledReplicas.value())) { + newThrottledReplicas.addAll(Arrays.asList(currThrottledReplicas.value().split(","))); + } + ops.add(new AlterConfigOp(new ConfigEntry(replicaThrottleConfigKey, String.join(",", newThrottledReplicas)), + AlterConfigOp.OpType.SET)); + } + if (!ops.isEmpty()) { + bulkOps.put(cf, ops); } - throw e; } - } - - void changeBrokerConfigs(int brokerId, Collection ops) - throws ExecutionException, InterruptedException, TimeoutException { - ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); - Map> configs = Collections.singletonMap(cf, ops); - _adminClient.incrementalAlterConfigs(configs).all() - .get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); - waitForConfigs(cf, ops); + changeTopicConfigs(bulkOps); } boolean topicExists(String topic) throws InterruptedException, TimeoutException, ExecutionException { @@ -280,97 +279,185 @@ static String removeReplicasFromConfig(String throttleConfig, Set replic return String.join(",", throttles); } + private void maybeRemoveThrottledReplicas(String topic, + Set replicas, + ConfigEntry throttledReplicasConfig, + String throttledReplicasConfigKey, + String replicaType, + List ops) { + if (throttledReplicasConfig == null) { + return; + } + String configValue = throttledReplicasConfig.value(); + if (configValue == null) { + return; + } + if (WILDCARD_ASTERISK.equals(configValue)) { + LOG.debug("Existing config throttles all {} replicas. So, do not remove any {} replica throttle", replicaType, replicaType); + return; + } + replicas.forEach(r -> LOG.debug("Removing {} throttles for topic {} and replica {}", replicaType, topic, r)); + String newThrottledReplicas = removeReplicasFromConfig(configValue, replicas); + if (newThrottledReplicas.isEmpty()) { + ops.add(new AlterConfigOp(new ConfigEntry(throttledReplicasConfigKey, null), AlterConfigOp.OpType.DELETE)); + } else { + ops.add(new AlterConfigOp(new ConfigEntry(throttledReplicasConfigKey, newThrottledReplicas), AlterConfigOp.OpType.SET)); + } + } + /** - * It gets whether there is any throttled replica specified in the configuration property. If there is and the - * specified throttled replica does not equal to "*", it modifies the configuration property by removing a - * given set of replicas from the set of throttled replicas + * Removes the provided replicas from topic-level throttled replica configs whenever they exist and are not wildcarded. * - * @param topic name of topic which contains replicas - * @param replicas replicas to remove from the configuration properties + * @param replicasByTopic topics mapped to the replicas that should be removed from their throttled replica configuration */ - private void removeThrottledReplicasFromTopic(String topic, Set replicas) - throws ExecutionException, InterruptedException, TimeoutException { - Config topicConfigs = getTopicConfigs(topic); - if (topicConfigs == null) { - LOG.debug("Skip removing throttled replicas {} from topic {} since no configs can be read", String.join(",", replicas), topic); + private void removeThrottledReplicasFromTopics(Map> replicasByTopic) + throws ExecutionException, InterruptedException, TimeoutException { + if (replicasByTopic == null || replicasByTopic.isEmpty()) { return; } - List ops = new ArrayList<>(); - - ConfigEntry currentLeaderThrottledReplicas = topicConfigs.get(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG); - if (currentLeaderThrottledReplicas != null) { - if (currentLeaderThrottledReplicas.value().equals(WILDCARD_ASTERISK)) { - LOG.debug("Existing config throttles all leader replicas. So, do not remove any leader replica throttle"); - } else { - replicas.forEach(r -> LOG.debug("Removing leader throttles for topic {} and replica {}", topic, r)); - String newThrottledReplicas = removeReplicasFromConfig(currentLeaderThrottledReplicas.value(), replicas); - if (newThrottledReplicas.isEmpty()) { - ops.add(new AlterConfigOp(new ConfigEntry(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, null), AlterConfigOp.OpType.DELETE)); + Map> bulkOps = new HashMap<>(); + Map topicConfigs = getTopicConfigs(replicasByTopic.keySet()); + for (Map.Entry> entry : replicasByTopic.entrySet()) { + String topic = entry.getKey(); + Set replicas = entry.getValue(); + ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); + Config topicConfig = topicConfigs.get(cf); + if (topicConfig == null) { + LOG.debug("Skip removing throttled replicas {} from topic {} since no configs can be read", + String.join(",", replicas), topic); + continue; + } + List ops = new ArrayList<>(); + ConfigEntry currentLeaderThrottledReplicas = topicConfig.get(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG); + maybeRemoveThrottledReplicas( + topic, + replicas, + currentLeaderThrottledReplicas, + LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, + "leader", + ops); + ConfigEntry currentFollowerThrottledReplicas = topicConfig.get(FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG); + maybeRemoveThrottledReplicas( + topic, + replicas, + currentFollowerThrottledReplicas, + FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, + "follower", + ops); + if (!ops.isEmpty()) { + bulkOps.put(cf, ops); + } + } + changeTopicConfigs(bulkOps); + } + + private void removeThrottledRateFromBrokers(Set brokerIds) + throws ExecutionException, InterruptedException, TimeoutException { + Map> bulkOps = new HashMap<>(); + Map brokerConfigsByResource = getBrokerConfigs(brokerIds); + for (Map.Entry entry : brokerConfigsByResource.entrySet()) { + ConfigResource cf = entry.getKey(); + String brokerId = cf.name(); + Config brokerConfigs = entry.getValue(); + ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_REPLICATION_THROTTLED_RATE_CONFIG); + ConfigEntry currFollowerThrottle = brokerConfigs.get(FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG); + List ops = new ArrayList<>(); + if (currLeaderThrottle != null) { + if (currLeaderThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { + LOG.debug("Skipping removal for static leader throttle rate: {}", currLeaderThrottle); } else { - ops.add(new AlterConfigOp(new ConfigEntry(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, newThrottledReplicas), AlterConfigOp.OpType.SET)); + LOG.debug("Removing leader throttle rate: {} on broker {}", currLeaderThrottle, brokerId); + ops.add(new AlterConfigOp(new ConfigEntry(LEADER_REPLICATION_THROTTLED_RATE_CONFIG, null), AlterConfigOp.OpType.DELETE)); } } - } - ConfigEntry currentFollowerThrottledReplicas = topicConfigs.get(FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG); - if (currentFollowerThrottledReplicas != null) { - if (currentFollowerThrottledReplicas.value().equals(WILDCARD_ASTERISK)) { - LOG.debug("Existing config throttles all follower replicas. So, do not remove any follower replica throttle"); - } else { - replicas.forEach(r -> LOG.debug("Removing follower throttles for topic {} and replica {}", topic, r)); - String newThrottledReplicas = removeReplicasFromConfig(currentFollowerThrottledReplicas.value(), replicas); - if (newThrottledReplicas.isEmpty()) { - ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, null), AlterConfigOp.OpType.DELETE)); + if (currFollowerThrottle != null) { + if (currFollowerThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { + LOG.debug("Skipping removal for static follower throttle rate: {}", currFollowerThrottle); } else { - ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, newThrottledReplicas), AlterConfigOp.OpType.SET)); + LOG.debug("Removing follower throttle rate: {} on broker {}", currFollowerThrottle, brokerId); + ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, null), AlterConfigOp.OpType.DELETE)); } } + if (!ops.isEmpty()) { + bulkOps.put(cf, ops); + } } - if (!ops.isEmpty()) { - changeTopicConfigs(topic, ops); + changeBrokerConfigs(bulkOps); + } + + private void changeBrokerConfigs(Map> bulkOps) + throws ExecutionException, InterruptedException, TimeoutException { + if (bulkOps == null || bulkOps.isEmpty()) { + return; } + _adminClient.incrementalAlterConfigs(bulkOps).all().get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + // Verify all broker configs in a single bulk describe + waitForConfigs(bulkOps); } - private void removeThrottledRateFromBroker(Integer brokerId) - throws ExecutionException, InterruptedException, TimeoutException { - Config brokerConfigs = getBrokerConfigs(brokerId); - ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_REPLICATION_THROTTLED_RATE_CONFIG); - ConfigEntry currFollowerThrottle = brokerConfigs.get(FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG); - List ops = new ArrayList<>(); - if (currLeaderThrottle != null) { - if (currLeaderThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { - LOG.debug("Skipping removal for static leader throttle rate: {}", currFollowerThrottle); - } else { - LOG.debug("Removing leader throttle rate: {} on broker {}", currLeaderThrottle, brokerId); - ops.add(new AlterConfigOp(new ConfigEntry(LEADER_REPLICATION_THROTTLED_RATE_CONFIG, null), AlterConfigOp.OpType.DELETE)); - } + private void changeTopicConfigs(Map> bulkOps) + throws ExecutionException, InterruptedException, TimeoutException { + if (bulkOps == null || bulkOps.isEmpty()) { + return; } - if (currFollowerThrottle != null) { - if (currFollowerThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { - LOG.debug("Skipping removal for static follower throttle rate: {}", currFollowerThrottle); - } else { - LOG.debug("Removing follower throttle rate: {} on broker {}", currFollowerThrottle, brokerId); - ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, null), AlterConfigOp.OpType.DELETE)); + AlterConfigsResult result = _adminClient.incrementalAlterConfigs(bulkOps); + Map> futures = result.values(); + Map> succeeded = new HashMap<>(); + for (Map.Entry> entry : futures.entrySet()) { + ConfigResource cf = entry.getKey(); + KafkaFuture future = entry.getValue(); + try { + future.get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + succeeded.put(cf, bulkOps.get(cf)); + } catch (Exception e) { + if (!topicExists(cf.name())) { + LOG.debug("Skipping configs for non-existent topic {} (confirmed via listTopics)", cf.name()); + continue; + } + throw e; } } - if (!ops.isEmpty()) { - changeBrokerConfigs(brokerId, ops); + if (!succeeded.isEmpty()) { + waitForConfigs(succeeded); } } + private Map getBrokerConfigs(Set brokerIds) + throws ExecutionException, InterruptedException, TimeoutException { + if (brokerIds == null || brokerIds.isEmpty()) { + return Collections.emptyMap(); + } + List resources = brokerIds.stream() + .map(id -> new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(id))) + .collect(Collectors.toList()); + return _adminClient.describeConfigs(resources).all().get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + // Retries until we can read the configs changes we just wrote - void waitForConfigs(ConfigResource cf, Collection ops) { - // Use HashMap::new instead of Collectors.toMap to allow inserting null values - Map expectedConfigs = ops.stream() - .collect(HashMap::new, (m, o) -> m.put(o.configEntry().name(), o.configEntry().value()), HashMap::putAll); + void waitForConfigs(Map> opsByResource) { + Map> expectedByResource = new HashMap<>(); + for (Map.Entry> entry : opsByResource.entrySet()) { + Map expected = new HashMap<>(); + for (AlterConfigOp op : entry.getValue()) { + expected.put(op.configEntry().name(), op.configEntry().value()); + } + expectedByResource.put(entry.getKey(), expected); + } + + List resources = new ArrayList<>(opsByResource.keySet()); boolean retryResponse = CruiseControlMetricsUtils.retry(() -> { try { - return !configsEqual(getEntityConfigs(cf), expectedConfigs); - } catch (ExecutionException | InterruptedException | TimeoutException e) { + Map actual = _adminClient.describeConfigs(resources).all() + .get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + return !configsEqualByResource(actual, expectedByResource); + } catch (Exception ex) { + LOG.debug("Failed while verifying configs in bulk: {}", ex.getMessage()); return false; } }, _retries); if (!retryResponse) { - throw new IllegalStateException("The following configs " + ops + " were not applied to " + cf + " within the time limit"); + throw new IllegalStateException("The following configs " + opsByResource + + " were not applied within the time limit"); } } @@ -389,4 +476,18 @@ static boolean configsEqual(Config configs, Map expectedValues) } return true; } + + static boolean configsEqualByResource(Map actualByResource, Map> expectedByResource) { + for (Map.Entry> entry : expectedByResource.entrySet()) { + ConfigResource cf = entry.getKey(); + Config actual = actualByResource.get(cf); + if (actual == null) { + actual = new Config(Collections.emptyList()); + } + if (!configsEqual(actual, entry.getValue())) { + return false; + } + } + return true; + } } diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java index 23a455b1b..e99e9807a 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0; @@ -88,16 +89,32 @@ private void createTopics() throws ExecutionException, InterruptedException { )).all().get(); } - private static void setWildcardThrottleReplicaForTopic(ReplicationThrottleHelper helper, String topicName) throws Exception { + private void setWildcardThrottleReplicaForTopic(String topicName) throws Exception { for (String replicaThrottleProp : Arrays.asList(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG)) { Collection configs = Collections.singletonList( new AlterConfigOp(new ConfigEntry(replicaThrottleProp, ReplicationThrottleHelper.WILDCARD_ASTERISK), AlterConfigOp.OpType.SET) ); - helper.changeTopicConfigs(topicName, configs); + applyTopicConfigs(topicName, configs); } } + private void applyBrokerConfigs(int brokerId, Collection ops) + throws ExecutionException, InterruptedException, TimeoutException { + ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); + Map> configs = Collections.singletonMap(cf, ops); + _adminClient.incrementalAlterConfigs(configs).all() + .get(ReplicationThrottleHelper.CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + + private void applyTopicConfigs(String topic, Collection ops) + throws ExecutionException, InterruptedException, TimeoutException { + ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); + Map> configs = Collections.singletonMap(cf, ops); + _adminClient.incrementalAlterConfigs(configs).all() + .get(ReplicationThrottleHelper.CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + private ExecutionTask inProgressTaskForProposal(long id, ExecutionProposal proposal) { ExecutionTask task = new ExecutionTask(id, proposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION, EXECUTION_ALERTING_THRESHOLD_MS); task.inProgress(0); @@ -166,7 +183,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception { ); // Expect that only the dynamic throttle rate configs are removed when clearing throttles expectDescribeBrokerConfigs(mockAdminClient, brokers, brokerConfig); - expectIncrementalBrokerConfigs(mockAdminClient, brokers); + expectIncrementalBrokerConfigs(mockAdminClient); expectDescribeBrokerConfigs(mockAdminClient, brokers, brokerConfig2); expectDescribeTopicConfigs(mockAdminClient, TOPIC0, EMPTY_CONFIG, false); expectListTopics(mockAdminClient, Collections.emptySet()); @@ -178,8 +195,8 @@ public void testClearThrottleOnNonExistentTopic() throws Exception { // Case 2: a situation where Topic0 gets deleted after its configs were read. EasyMock.reset(mockAdminClient); - expectDescribeBrokerConfigs(mockAdminClient, brokers); - expectIncrementalBrokerConfigs(mockAdminClient, brokers); + expectDescribeBrokerConfigs(mockAdminClient, brokers, brokerConfig); + expectIncrementalBrokerConfigs(mockAdminClient); expectDescribeBrokerConfigs(mockAdminClient, brokers, EMPTY_CONFIG); String throttledReplicas = brokerId0 + "," + brokerId1; Config topicConfigProps = new Config(Arrays.asList( @@ -216,6 +233,12 @@ public void testSetThrottleOnNonExistentTopic() throws Exception { // Case 1: a situation where Topic0 does not exist. Hence no property is returned upon read. expectDescribeBrokerConfigs(mockAdminClient, brokers); + // incremental alter for brokers, then verification describe with expected throttle values + expectIncrementalBrokerConfigs(mockAdminClient); + Config brokerConfigAfter = new Config(Arrays.asList( + new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, String.valueOf(throttleRate)), + new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, String.valueOf(throttleRate)))); + expectDescribeBrokerConfigs(mockAdminClient, brokers, brokerConfigAfter); expectDescribeTopicConfigs(mockAdminClient, TOPIC0, EMPTY_CONFIG, false); expectListTopics(mockAdminClient, Collections.emptySet()); expectIncrementalTopicConfigs(mockAdminClient, TOPIC0, false); @@ -228,6 +251,8 @@ public void testSetThrottleOnNonExistentTopic() throws Exception { // Case 2: a situation where Topic0 gets deleted after its configs were read. Change configs should not fail. EasyMock.reset(mockAdminClient); expectDescribeBrokerConfigs(mockAdminClient, brokers); + expectIncrementalBrokerConfigs(mockAdminClient); + expectDescribeBrokerConfigs(mockAdminClient, brokers, brokerConfigAfter); String throttledReplicas = brokerId0 + "," + brokerId1; Config topicConfigs = new Config(Arrays.asList( new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, throttledReplicas), @@ -299,27 +324,25 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, String.valueOf(throttleRate)), AlterConfigOp.OpType.SET) ); - throttleHelper.changeBrokerConfigs(0, broker0Configs); + applyBrokerConfigs(0, broker0Configs); // Partition 1 (which is not involved in any execution proposal) has pre-existing throttled // replicas (on both leaders and followers); we expect these configurations to be merged // with our new throttled replicas. List topic0Configs = Arrays.asList( - new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:0,1:1"), - AlterConfigOp.OpType.SET), - new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:0,1:1"), - AlterConfigOp.OpType.SET) - ); - throttleHelper.changeTopicConfigs(TOPIC0, topic0Configs); + new AlterConfigOp( + new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:0,1:1"), + AlterConfigOp.OpType.SET), + new AlterConfigOp( + new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:0,1:1"), + AlterConfigOp.OpType.SET)); + applyTopicConfigs(TOPIC0, topic0Configs); // Topic 1 is not involved in any execution proposal. It has pre-existing throttled replicas. List topic1Config = Arrays.asList( - new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:1"), - AlterConfigOp.OpType.SET), - new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:1"), - AlterConfigOp.OpType.SET) - ); - throttleHelper.changeTopicConfigs(TOPIC1, topic1Config); + new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:1"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, "1:1"), AlterConfigOp.OpType.SET)); + applyTopicConfigs(TOPIC1, topic1Config); throttleHelper.setThrottles(Collections.singletonList(proposal)); @@ -355,8 +378,8 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, throttleRate); // Set replica throttle config values for both topics - setWildcardThrottleReplicaForTopic(throttleHelper, TOPIC0); - setWildcardThrottleReplicaForTopic(throttleHelper, TOPIC1); + setWildcardThrottleReplicaForTopic(TOPIC0); + setWildcardThrottleReplicaForTopic(TOPIC1); ExecutionProposal proposal = new ExecutionProposal(new TopicPartition(TOPIC0, 0), 100, new ReplicaPlacementInfo(0), @@ -498,15 +521,15 @@ public void testWaitForConfigs() throws Exception { EasyMock.replay(mockAdminClient); ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, retries); ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); - assertThrows(IllegalStateException.class, () -> throttleHelper.waitForConfigs(cf, Collections.singletonList( - new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET) - ))); + Map> opsByResource = Collections.singletonMap(cf, + Collections.singletonList(new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET))); + assertThrows(IllegalStateException.class, () -> throttleHelper.waitForConfigs(opsByResource)); // Case 2: queue a single result and call checkConfigs with matching configs, so it succeeds EasyMock.reset(mockAdminClient); expectDescribeTopicConfigs(mockAdminClient, TOPIC0, EMPTY_CONFIG, true); EasyMock.replay(mockAdminClient); - throttleHelper.waitForConfigs(cf, Collections.emptyList()); + throttleHelper.waitForConfigs(Collections.singletonMap(cf, Collections.emptyList())); } @Test @@ -582,14 +605,15 @@ private void expectIncrementalTopicConfigs(AdminClient adminClient, String topic ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); AlterConfigsResult mockAlterConfigsResult = EasyMock.mock(AlterConfigsResult.class); KafkaFuture mockFuture = EasyMock.mock(KafkaFuture.class); - EasyMock.expect(mockAlterConfigsResult.all()).andReturn(mockFuture); + Map> futures = Collections.singletonMap(cf, mockFuture); + EasyMock.expect(mockAlterConfigsResult.values()).andReturn(futures); if (topicExists) { EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(null); } else { EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())) .andThrow(new ExecutionException(new UnknownTopicOrPartitionException())); } - EasyMock.expect(adminClient.incrementalAlterConfigs(Collections.singletonMap(cf, EasyMock.anyObject()))).andReturn(mockAlterConfigsResult); + EasyMock.expect(adminClient.incrementalAlterConfigs(EasyMock.anyObject())).andReturn(mockAlterConfigsResult); EasyMock.replay(mockAlterConfigsResult, mockFuture); } @@ -606,37 +630,237 @@ private void expectListTopics(AdminClient adminClient, Set topics) private void expectDescribeBrokerConfigs(AdminClient adminClient, List brokers) throws ExecutionException, InterruptedException, TimeoutException { // All participating brokers have throttled rate set already - Config brokerConfig = new Config(Arrays.asList( - new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "100"), - new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "100"))); - expectDescribeBrokerConfigs(adminClient, brokers, brokerConfig); + expectDescribeBrokerConfigs(adminClient, brokers, EMPTY_CONFIG); } private void expectDescribeBrokerConfigs(AdminClient adminClient, List brokers, Config brokerConfig) throws ExecutionException, InterruptedException, TimeoutException { + Map brokerConfigs = new HashMap<>(); for (int i : brokers) { ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(i)); - Map brokerConfigs = Collections.singletonMap(cf, brokerConfig); + brokerConfigs.put(cf, brokerConfig); + } + DescribeConfigsResult mockDescribeConfigsResult = EasyMock.mock(DescribeConfigsResult.class); + KafkaFuture> mockFuture = EasyMock.mock(KafkaFuture.class); + EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(brokerConfigs); + EasyMock.expect(mockDescribeConfigsResult.all()).andReturn(mockFuture); + EasyMock.expect(adminClient.describeConfigs(EasyMock.anyObject())).andReturn(mockDescribeConfigsResult); + EasyMock.replay(mockDescribeConfigsResult, mockFuture); + } + + private void expectIncrementalBrokerConfigs(AdminClient adminClient) + throws ExecutionException, InterruptedException, TimeoutException { + AlterConfigsResult mockAlterConfigsResult = EasyMock.mock(AlterConfigsResult.class); + KafkaFuture mockFuture = EasyMock.mock(KafkaFuture.class); + EasyMock.expect(mockAlterConfigsResult.all()).andReturn(mockFuture); + EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(null); + EasyMock.expect(adminClient.incrementalAlterConfigs(EasyMock.anyObject())).andReturn(mockAlterConfigsResult); + EasyMock.replay(mockAlterConfigsResult, mockFuture); + } + + /** + * With no participating brokers, no broker config alter should be issued. + */ + @Test + public void setThrottlesWhenNoParticipatingBrokersThenNoBrokerAlter() throws Exception { + AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, java.util.Set.of()); + EasyMock.replay(mockAdminClient); + throttleHelper.setThrottles(Collections.emptyList()); + EasyMock.verify(mockAdminClient); + } + + /** + * If wildcard throttles are already set on the topic, operations should be a no-op for topic configuration updates. + */ + @Test + public void setThrottledReplicasWhenWildcardPresentThenNoOp() throws Exception { + final long throttleRate = 100L; + final int brokerId0 = 0; + final int brokerId1 = 1; + final int brokerId2 = 2; + final List brokers = Arrays.asList(brokerId0, brokerId1, brokerId2); + + ExecutionProposal proposal = new ExecutionProposal(new TopicPartition(TOPIC0, 0), + 100, + new ReplicaPlacementInfo(brokerId0), + Arrays.asList(new ReplicaPlacementInfo(brokerId0), new ReplicaPlacementInfo(brokerId1)), + Arrays.asList(new ReplicaPlacementInfo(brokerId0), new ReplicaPlacementInfo(brokerId2))); + + AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, throttleRate, java.util.Set.of()); + + Config brokerConfigAlready = new Config(Arrays.asList( + new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, String.valueOf(throttleRate)), + new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, String.valueOf(throttleRate)))); + expectDescribeBrokerConfigs(mockAdminClient, brokers, brokerConfigAlready); + + Config wildcardTopicConfig = new Config(Arrays.asList( + new ConfigEntry(ReplicationThrottleHelper.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, ReplicationThrottleHelper.WILDCARD_ASTERISK), + new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, ReplicationThrottleHelper.WILDCARD_ASTERISK))); + expectDescribeTopicConfigs(mockAdminClient, TOPIC0, wildcardTopicConfig, true); + + EasyMock.replay(mockAdminClient); + throttleHelper.setThrottles(Collections.singletonList(proposal)); + EasyMock.verify(mockAdminClient); + } + + /** + * waitForConfigs should throw when verification exceeds the maximum number of retries. + */ + @Test + public void waitForConfigsWhenExceedsRetriesThenThrow() throws Exception { + AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); + int retries = 2; + + ConfigResource topicCf = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); + ConfigResource brokerCf = new ConfigResource(ConfigResource.Type.BROKER, "1"); + Map> opsByResource = new HashMap<>(); + opsByResource.put(topicCf, Collections.singletonList(new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET))); + opsByResource.put(brokerCf, Collections.singletonList(new AlterConfigOp(new ConfigEntry("a", "b"), AlterConfigOp.OpType.SET))); + + for (int i = 0; i < retries + 1; i++) { DescribeConfigsResult mockDescribeConfigsResult = EasyMock.mock(DescribeConfigsResult.class); KafkaFuture> mockFuture = EasyMock.mock(KafkaFuture.class); - EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(brokerConfigs); + Map returned = new HashMap<>(); + returned.put(topicCf, new Config(Collections.emptyList())); + returned.put(brokerCf, new Config(Collections.emptyList())); + EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(returned); EasyMock.expect(mockDescribeConfigsResult.all()).andReturn(mockFuture); - EasyMock.expect(adminClient.describeConfigs(Collections.singletonList(cf))).andReturn(mockDescribeConfigsResult); + EasyMock.expect(mockAdminClient.describeConfigs(EasyMock.anyObject())).andReturn(mockDescribeConfigsResult); EasyMock.replay(mockDescribeConfigsResult, mockFuture); } + + EasyMock.replay(mockAdminClient); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, retries); + assertThrows(IllegalStateException.class, () -> throttleHelper.waitForConfigs(opsByResource)); } - private void expectIncrementalBrokerConfigs(AdminClient adminClient, List brokers) - throws ExecutionException, InterruptedException, TimeoutException { - for (int brokerId : brokers) { - ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); - AlterConfigsResult mockAlterConfigsResult = EasyMock.mock(AlterConfigsResult.class); - KafkaFuture mockFuture = EasyMock.mock(KafkaFuture.class); - EasyMock.expect(mockAlterConfigsResult.all()).andReturn(mockFuture); - EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(null); - EasyMock.expect(adminClient.incrementalAlterConfigs(Collections.singletonMap(cf, EasyMock.anyObject()))).andReturn(mockAlterConfigsResult); - EasyMock.replay(mockAlterConfigsResult, mockFuture); - } + /** + * waitForConfigs should succeed when actual configs match the expected + * values for all resources. + */ + @Test + public void waitForConfigsWhenConfigsMatchThenSuccess() throws Exception { + AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); + int retries = 2; + + ConfigResource topicCf = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); + ConfigResource brokerCf = new ConfigResource(ConfigResource.Type.BROKER, "1"); + Map> opsByResource = new HashMap<>(); + opsByResource.put(topicCf, Collections.singletonList(new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET))); + opsByResource.put(brokerCf, Collections.singletonList(new AlterConfigOp(new ConfigEntry("a", "b"), AlterConfigOp.OpType.SET))); + + Map returned = new HashMap<>(); + returned.put(topicCf, new Config(Collections.singletonList(new ConfigEntry("k", "v")))); + returned.put(brokerCf, new Config(Collections.singletonList(new ConfigEntry("a", "b")))); + + DescribeConfigsResult mockDescribeConfigsResult = EasyMock.mock(DescribeConfigsResult.class); + KafkaFuture> mockFuture = EasyMock.mock(KafkaFuture.class); + EasyMock.expect(mockFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(returned); + EasyMock.expect(mockDescribeConfigsResult.all()).andReturn(mockFuture); + EasyMock.expect(mockAdminClient.describeConfigs(EasyMock.anyObject())).andReturn(mockDescribeConfigsResult); + EasyMock.replay(mockDescribeConfigsResult, mockFuture); + + EasyMock.replay(mockAdminClient); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, retries); + throttleHelper.waitForConfigs(opsByResource); + EasyMock.verify(mockAdminClient); + } + + /** + * configsEqualByResource returns true when all resources match expected values. + */ + @Test + public void configsEqualByResourceWhenAllResourcesMatchThenTrue() { + Map> expectedByResource = new HashMap<>(); + ConfigResource cf1 = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); + ConfigResource cf2 = new ConfigResource(ConfigResource.Type.BROKER, "1"); + expectedByResource.put(cf1, Collections.singletonMap("k1", "v1")); + expectedByResource.put(cf2, Collections.singletonMap("k2", "v2")); + + Map actualByResource = new HashMap<>(); + actualByResource.put(cf1, new Config(Collections.singletonList(new ConfigEntry("k1", "v1")))); + actualByResource.put(cf2, new Config(Collections.singletonList(new ConfigEntry("k2", "v2")))); + + boolean actual = ReplicationThrottleHelper.configsEqualByResource(actualByResource, expectedByResource); + assertTrue(actual); + } + + /** + * configsEqualByResource returns false when any resource mismatches. + */ + @Test + public void configsEqualByResourceWhenAnyResourceMismatchesThenFalse() { + Map> expectedByResource = new HashMap<>(); + ConfigResource cf1 = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); + expectedByResource.put(cf1, Collections.singletonMap("k1", "v1")); + + Map actualByResource = new HashMap<>(); + actualByResource.put(cf1, new Config(Collections.singletonList(new ConfigEntry("k1", "x")))); + + boolean actual = ReplicationThrottleHelper.configsEqualByResource(actualByResource, expectedByResource); + assertFalse(actual); + } + + /** + * Should retry on partial mismatch and eventually succeed when subsequent + * describes match the expected values. + */ + @Test + public void waitForConfigsWhenPartialMismatchThenRetryAndEventuallySuccess() throws Exception { + AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); + int retries = 2; + + ConfigResource topicCf = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); + ConfigResource brokerCf = new ConfigResource(ConfigResource.Type.BROKER, "1"); + Map> opsByResource = new HashMap<>(); + opsByResource.put(topicCf, Collections.singletonList(new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET))); + opsByResource.put(brokerCf, Collections.singletonList(new AlterConfigOp(new ConfigEntry("a", "b"), AlterConfigOp.OpType.SET))); + + DescribeConfigsResult firstDescribe = EasyMock.mock(DescribeConfigsResult.class); + KafkaFuture> firstFuture = EasyMock.mock(KafkaFuture.class); + Map firstReturned = new HashMap<>(); + firstReturned.put(topicCf, new Config(Collections.emptyList())); + firstReturned.put(brokerCf, new Config(Collections.singletonList(new ConfigEntry("a", "b")))); + EasyMock.expect(firstFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(firstReturned); + EasyMock.expect(firstDescribe.all()).andReturn(firstFuture); + EasyMock.expect(mockAdminClient.describeConfigs(EasyMock.anyObject())).andReturn(firstDescribe); + EasyMock.replay(firstDescribe, firstFuture); + + DescribeConfigsResult secondDescribe = EasyMock.mock(DescribeConfigsResult.class); + KafkaFuture> secondFuture = EasyMock.mock(KafkaFuture.class); + Map secondReturned = new HashMap<>(); + secondReturned.put(topicCf, new Config(Collections.singletonList(new ConfigEntry("k", "v")))); + secondReturned.put(brokerCf, new Config(Collections.singletonList(new ConfigEntry("a", "b")))); + EasyMock.expect(secondFuture.get(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(secondReturned); + EasyMock.expect(secondDescribe.all()).andReturn(secondFuture); + EasyMock.expect(mockAdminClient.describeConfigs(EasyMock.anyObject())).andReturn(secondDescribe); + EasyMock.replay(secondDescribe, secondFuture); + + EasyMock.replay(mockAdminClient); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, retries); + throttleHelper.waitForConfigs(opsByResource); + EasyMock.verify(mockAdminClient); + } + + /** + * configsEqualByResource returns false when an expected resource is missing in + * the actual result set. + */ + @Test + public void configsEqualByResourceWhenMissingResourceInActualThenFalse() { + Map> expectedByResource = new HashMap<>(); + ConfigResource cf1 = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); + ConfigResource cf2 = new ConfigResource(ConfigResource.Type.BROKER, "1"); + expectedByResource.put(cf1, Collections.singletonMap("k1", "v1")); + expectedByResource.put(cf2, Collections.singletonMap("k2", "v2")); + + Map actualByResource = new HashMap<>(); + actualByResource.put(cf1, new Config(Collections.singletonList(new ConfigEntry("k1", "v1")))); + + boolean actual = ReplicationThrottleHelper.configsEqualByResource(actualByResource, expectedByResource); + assertFalse(actual); } private void assertExpectedThrottledRateForBroker(int brokerId, Long expectedRate) throws ExecutionException, InterruptedException {