Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

### Changes, deprecations and removals

* Disable CC network resource goals when resource capacities are not set.
Comment thread
kyguy marked this conversation as resolved.
Outdated
* **From Strimzi 0.48.0 on, we support only Kubernetes 1.27 and newer.**
Kubernetes 1.25 and 1.26 are not supported anymore.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public static CruiseControl fromCrd(
}

private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, KafkaConfiguration kafkaConfiguration) {
Map<String, String> defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
Map<String, String> defaultCruiseControlProperties = CruiseControlConfiguration.generateDefaultPropertiesMap(capacityConfiguration);
if (kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) != null) {
defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,38 @@ private static TreeMap<Integer, CapacityEntry> generateCapacityEntries(Reconcili
return capacityEntries;
}

/**
* Indicates whether the inbound network capacity settings were explicitly configured by the user.
*
* @return {@code true} if inbound network capacity is user-configured; {@code false} otherwise.
*/
public boolean isInboundNetworkConfigured() {
return capacityEntries.values().stream().allMatch(entry -> entry.inboundNetwork.isUserConfigured());
}

/**
* Indicates whether the outbound network capacity settings were explicitly configured by the user.
*
* @return {@code true} if outbound network capacity is user-configured; {@code false} otherwise.
*/
public boolean isOutboundNetworkConfigured() {
return capacityEntries.values().stream().allMatch(entry -> entry.outboundNetwork.isUserConfigured());
}

/**
* Checks whether the inbound network capacity is configured identically across all brokers entries in
* the capacity configuration.
*
* @return {@code true} if all broker entries have the same inbound network capacity configuration; {@code false} otherwise.
*/
public boolean isInboundCapacityHomogeneouslyConfigured() {
return this.capacityEntries.values().stream()
.map(entry -> entry.inboundNetwork.getJson())
.distinct()
.limit(2)
.count() == 1;
}

/**
* Generate a capacity configuration for cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.operator.cluster.model.cruisecontrol;

import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec;
Expand All @@ -12,6 +11,7 @@
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -65,8 +65,6 @@ public class CruiseControlConfiguration extends AbstractConfiguration {
CruiseControlGoals.CPU_CAPACITY_GOAL.toString()
);

private static final String CRUISE_CONTROL_HARD_GOALS = String.join(",", CRUISE_CONTROL_HARD_GOALS_LIST);

protected static final List<String> CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS_LIST = List.of(
CruiseControlGoals.RACK_AWARENESS_GOAL.toString(),
CruiseControlGoals.MIN_TOPIC_LEADERS_PER_BROKER_GOAL.toString(),
Expand All @@ -85,15 +83,13 @@ public class CruiseControlConfiguration extends AbstractConfiguration {
* of the entries in the Cruise Control configuration is deterministic and does not cause unnecessary rolling updates
* of Cruise Control deployment.
*/
private static final Map<String, String> CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP = Collections.unmodifiableSortedMap(new TreeMap<>(Map.ofEntries(
private static final Map<String, String> STATIC_DEFAULT_PROPERTIES_MAP = Collections.unmodifiableSortedMap(new TreeMap<>(Map.ofEntries(
Comment thread
ppatierno marked this conversation as resolved.
Outdated
Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)),
Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "1"),
Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)),
Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "20"),
Map.entry(CruiseControlConfigurationParameters.COMPLETED_USER_TASK_RETENTION_MS_CONFIG_KEY.getValue(), Long.toString(TimeUnit.DAYS.toMillis(1))),
Map.entry(CruiseControlConfigurationParameters.GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS),
Map.entry(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS),
Map.entry(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_HARD_GOALS),
Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SECURITY_ENABLED)),
Map.entry(CruiseControlConfigurationParameters.WEBSERVER_AUTH_CREDENTIALS_FILE.getValue(), CruiseControl.API_AUTH_CREDENTIALS_FILE),
Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED)),
Expand All @@ -118,10 +114,60 @@ public CruiseControlConfiguration(Reconciliation reconciliation, Iterable<Map.En
}

/**
* @return Map Cruise Control's default configuration properties
* Generates map containing default values for required configuration properties. The map needs to be sorted so that the order
* of the entries in the Cruise Control configuration is deterministic and does not cause unnecessary rolling updates
* of Cruise Control deployment.
*
* @param capacityConfiguration The object containing Cruise Control capacity configuration information.
*
* @return Map containing default values for required configuration properties.
*/
public static Map<String, String> generateDefaultPropertiesMap(CapacityConfiguration capacityConfiguration) {
Comment thread
kyguy marked this conversation as resolved.
TreeMap<String, String> map = new TreeMap<>(STATIC_DEFAULT_PROPERTIES_MAP);
map.put(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(),
String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_GOALS_LIST, capacityConfiguration)));
map.put(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(),
String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_HARD_GOALS_LIST, capacityConfiguration)));
return map;
}

/**
* Filters out Cruise Control resource goals if the necessary resource capacityConfiguration settings are not defined in
* the user's Kafka custom resource.
*
* @param goalList The initial list of supported Cruise Control goals.
* @param capacityConfiguration The object containing Cruise Control capacity configuration information.
* @return A new list containing only the goals that are safe to enable given the configured resource capacities.
*/
public static Map<String, String> getCruiseControlDefaultPropertiesMap() {
return CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP;
/* test */ static List<String> filterResourceGoalsWithoutCapacityConfig(List<String> goalList,
CapacityConfiguration capacityConfiguration) {
List<String> filteredGoalList = new ArrayList<>(goalList);

if (!capacityConfiguration.isInboundNetworkConfigured()) {
filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_CAPACITY_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString());
} else {

if (!capacityConfiguration.isInboundCapacityHomogeneouslyConfigured()) {
/*
* The LeaderBytesInDistributionGoal does not account for heterogeneous inbound network capacities.
* See: https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LeaderBytesInDistributionGoal.java
* Therefore, if the INBOUND_NETWORK capacityConfiguration settings are not homogeneously configured,
* we remove this goal from the list.
*/
filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString());
}

}

if (!capacityConfiguration.isOutboundNetworkConfigured()) {
Comment thread
kyguy marked this conversation as resolved.
filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_CAPACITY_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.POTENTIAL_NETWORK_OUTAGE_GOAL.toString());
}

return filteredGoalList;
}
Comment thread
kyguy marked this conversation as resolved.

private boolean isEnabledInConfiguration(String s1, String s2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class InboundNetworkCapacity extends NetworkCapacity {
* @param brokerCapacityOverride The brokerCapacityOverride for specific broker.
*/
protected InboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) {
super(getThroughputInKiB(processResourceCapacity(brokerCapacity, brokerCapacityOverride)));
this.config = getThroughputInKiB(processResourceCapacity(brokerCapacity, brokerCapacityOverride));
}

/**
Expand All @@ -43,13 +43,16 @@ protected InboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOv
*
* @return The capacity of resource represented as a String.
*/
private static String processResourceCapacity(BrokerCapacity brokerCapacity,
BrokerCapacityOverride brokerCapacityOverride) {
private String processResourceCapacity(BrokerCapacity brokerCapacity,
BrokerCapacityOverride brokerCapacityOverride) {
if (brokerCapacityOverride != null && brokerCapacityOverride.getInboundNetwork() != null) {
this.isUserConfigured = true;
return brokerCapacityOverride.getInboundNetwork();
} else if (brokerCapacity != null && brokerCapacity.getInboundNetwork() != null) {
this.isUserConfigured = true;
return brokerCapacity.getInboundNetwork();
} else {
this.isUserConfigured = false;
return DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND;
}
Comment thread
kyguy marked this conversation as resolved.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ public abstract class NetworkCapacity {
*/
protected static final String DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000KiB/s";
protected String config;

protected NetworkCapacity(String config) {
this.config = config;
}
protected boolean isUserConfigured;

/**
* Parse Strimzi representation of throughput, such as {@code 10000KB/s},
Expand All @@ -40,4 +37,13 @@ protected static String getThroughputInKiB(String throughput) {
protected String getJson() {
return config;
}

/**
* Returns whether the capacity was explicitly configured by the user.
*
* @return true if user provided configuration, false otherwise.
*/
protected boolean isUserConfigured() {
return isUserConfigured;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class OutboundNetworkCapacity extends NetworkCapacity {
* @param brokerCapacityOverride The brokerCapacityOverride for specific broker.
*/
protected OutboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) {
super(getThroughputInKiB(processResourceCapacity(brokerCapacity, brokerCapacityOverride)));
this.config = getThroughputInKiB(processResourceCapacity(brokerCapacity, brokerCapacityOverride));
}

/**
Expand All @@ -43,13 +43,16 @@ protected OutboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityO
*
* @return The capacity of resource represented as a String.
*/
private static String processResourceCapacity(BrokerCapacity brokerCapacity,
BrokerCapacityOverride brokerCapacityOverride) {
private String processResourceCapacity(BrokerCapacity brokerCapacity,
BrokerCapacityOverride brokerCapacityOverride) {
if (brokerCapacityOverride != null && brokerCapacityOverride.getOutboundNetwork() != null) {
this.isUserConfigured = true;
return brokerCapacityOverride.getOutboundNetwork();
} else if (brokerCapacity != null && brokerCapacity.getOutboundNetwork() != null) {
this.isUserConfigured = true;
return brokerCapacity.getOutboundNetwork();
} else {
this.isUserConfigured = false;
return DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND;
}
Comment thread
kyguy marked this conversation as resolved.
}
Expand Down
Loading
Loading