Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

### Major changes, deprecations and removals

* Disable CC network resource goals when resource capacities are not set.
Comment thread
kyguy marked this conversation as resolved.
Outdated
* Strimzi 0.47.0 (and any of its patch releases) is the last Strimzi version with support for Kubernetes 1.25 and 1.26.
From Strimzi 0.48.0 on, we will support only Kubernetes 1.27 and newer.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static io.strimzi.api.kafka.model.common.template.DeploymentStrategy.ROLLING_UPDATE;
import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS;
import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.CRUISE_CONTROL_GOALS;
import static io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration.generateCruiseControlDefaultPropertiesMap;
import static java.lang.String.format;

/**
Expand Down Expand Up @@ -244,7 +245,7 @@ public static CruiseControl fromCrd(
}

private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, KafkaConfiguration kafkaConfiguration) {
Map<String, String> defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
Map<String, String> defaultCruiseControlProperties = generateCruiseControlDefaultPropertiesMap(capacityConfiguration);
if (kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) != null) {
defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ public class CapacityConfiguration {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(CapacityConfiguration.class.getName());

private final TreeMap<Integer, CapacityEntry> capacityEntries;
private boolean isInboundNetworkConfigured;
private boolean isOutboundNetworkConfigured;

/**
* Constructor
Expand Down Expand Up @@ -187,11 +189,11 @@ private static Map<Integer, BrokerCapacityOverride> processBrokerCapacityOverrid
return overrideMap;
}

private static TreeMap<Integer, CapacityEntry> generateCapacityEntries(Reconciliation reconciliation,
CruiseControlSpec spec,
Set<NodeRef> kafkaBrokerNodes,
Map<String, Storage> kafkaStorage,
Map<String, ResourceRequirements> kafkaBrokerResources) {
private TreeMap<Integer, CapacityEntry> generateCapacityEntries(Reconciliation reconciliation,
CruiseControlSpec spec,
Set<NodeRef> kafkaBrokerNodes,
Map<String, Storage> kafkaStorage,
Map<String, ResourceRequirements> kafkaBrokerResources) {
TreeMap<Integer, CapacityEntry> capacityEntries = new TreeMap<>();
BrokerCapacity commonBrokerCapacity = spec.getBrokerCapacity();
Map<Integer, BrokerCapacityOverride> brokerCapacityOverrideMap = processBrokerCapacityOverrides(reconciliation,
Expand All @@ -208,9 +210,30 @@ private static TreeMap<Integer, CapacityEntry> generateCapacityEntries(Reconcili
capacityEntries.put(node.nodeId(), new CapacityEntry(node.nodeId(), disk, cpu, inboundNetwork, outboundNetwork));
}

isInboundNetworkConfigured = InboundNetworkCapacity.isCapacityConfigured(commonBrokerCapacity);
isOutboundNetworkConfigured = OutboundNetworkCapacity.isCapacityConfigured(commonBrokerCapacity);
Comment thread
kyguy marked this conversation as resolved.
Outdated

return capacityEntries;
}

/**
* Indicates whether the inbound network capacity settings were explicitly configured by the user.
*
* @return {@code true} if inbound network capacity is user-configured; {@code false} otherwise.
*/
public boolean isInboundNetworkConfigured() {
return this.isInboundNetworkConfigured;
}

/**
* Indicates whether the outbound network capacity settings were explicitly configured by the user.
*
* @return {@code true} if outbound network capacity is user-configured; {@code false} otherwise.
*/
public boolean isOutboundNetworkConfigured() {
return this.isOutboundNetworkConfigured;
}

/**
* Generate a capacity configuration for cluster.
*
Expand All @@ -236,6 +259,18 @@ public String toJson() {
return new JsonObject().put("brokerCapacities", capacityList).encodePrettily();
}

/**
* Checks whether inbound network resource type has its capacityConfiguration homogeneously configured across all brokers.
* @return {@code true} if capacityConfiguration is homogeneously configured; {@code false} otherwise.
*/
public boolean isInboundCapacityHomogeneouslyConfigured() {
return this.capacityEntries.values().stream()
.map(entry -> entry.inboundNetwork.getJson())
.distinct()
.limit(2)
.count() == 1;
}

/**
* Represents a Cruise Control capacity entry configuration for a Kafka broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.operator.cluster.model.cruisecontrol;

import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec;
Expand All @@ -12,7 +11,7 @@
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlGoals;

import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -65,8 +64,6 @@ public class CruiseControlConfiguration extends AbstractConfiguration {
CruiseControlGoals.CPU_CAPACITY_GOAL.toString()
);

private static final String CRUISE_CONTROL_HARD_GOALS = String.join(",", CRUISE_CONTROL_HARD_GOALS_LIST);

protected static final List<String> CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS_LIST = List.of(
CruiseControlGoals.RACK_AWARENESS_GOAL.toString(),
CruiseControlGoals.MIN_TOPIC_LEADERS_PER_BROKER_GOAL.toString(),
Expand All @@ -81,26 +78,34 @@ public class CruiseControlConfiguration extends AbstractConfiguration {
String.join(",", CRUISE_CONTROL_DEFAULT_ANOMALY_DETECTION_GOALS_LIST);

/**
* Map containing default values for required configuration properties. The map needs to be sorted so that the order
* Generates map containing default values for required configuration properties. The map needs to be sorted so that the order
* of the entries in the Cruise Control configuration is deterministic and does not cause unnecessary rolling updates
* of Cruise Control deployment.
*
* @param capacityConfiguration The object containing Cruise Control capacity configuration information.
*
* @return Map containing default values for required configuration properties.
*/
private static final Map<String, String> CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP = Collections.unmodifiableSortedMap(new TreeMap<>(Map.ofEntries(
Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)),
Map.entry(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "1"),
Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000)),
Map.entry(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "20"),
Map.entry(CruiseControlConfigurationParameters.COMPLETED_USER_TASK_RETENTION_MS_CONFIG_KEY.getValue(), Long.toString(TimeUnit.DAYS.toMillis(1))),
Map.entry(CruiseControlConfigurationParameters.GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS),
Map.entry(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS),
Map.entry(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_HARD_GOALS),
Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SECURITY_ENABLED)),
Map.entry(CruiseControlConfigurationParameters.WEBSERVER_AUTH_CREDENTIALS_FILE.getValue(), CruiseControl.API_AUTH_CREDENTIALS_FILE),
Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED)),
Map.entry(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME),
Map.entry(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME),
Map.entry(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME)
)));
public static Map<String, String> generateCruiseControlDefaultPropertiesMap(CapacityConfiguration capacityConfiguration) {
Comment thread
kyguy marked this conversation as resolved.
Outdated
TreeMap<String, String> map = new TreeMap<>();
map.put(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000));
map.put(CruiseControlConfigurationParameters.PARTITION_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "1");
map.put(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_MS_CONFIG_KEY.getValue(), Integer.toString(300_000));
map.put(CruiseControlConfigurationParameters.BROKER_METRICS_WINDOW_NUM_CONFIG_KEY.getValue(), "20");
map.put(CruiseControlConfigurationParameters.COMPLETED_USER_TASK_RETENTION_MS_CONFIG_KEY.getValue(), Long.toString(TimeUnit.DAYS.toMillis(1)));
map.put(CruiseControlConfigurationParameters.GOALS_CONFIG_KEY.getValue(), CRUISE_CONTROL_GOALS);
map.put(CruiseControlConfigurationParameters.DEFAULT_GOALS_CONFIG_KEY.getValue(),
String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_GOALS_LIST, capacityConfiguration)));
map.put(CruiseControlConfigurationParameters.HARD_GOALS_CONFIG_KEY.getValue(),
String.join(",", filterResourceGoalsWithoutCapacityConfig(CRUISE_CONTROL_HARD_GOALS_LIST, capacityConfiguration)));
map.put(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SECURITY_ENABLED));
map.put(CruiseControlConfigurationParameters.WEBSERVER_AUTH_CREDENTIALS_FILE.getValue(), CruiseControl.API_AUTH_CREDENTIALS_FILE);
map.put(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED));
map.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME);
map.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME);
map.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME);
return map;
}

private static final List<String> FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(CruiseControlSpec.FORBIDDEN_PREFIXES);
private static final List<String> FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(CruiseControlSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
Expand All @@ -118,10 +123,38 @@ public CruiseControlConfiguration(Reconciliation reconciliation, Iterable<Map.En
}

/**
* @return Map Cruise Control's default configuration properties
* Filters out Cruise Control resource goals if the necessary resource capacityConfiguration settings are not defined in
* the user's Kafka custom resource.
*
* @param goalList The initial list of supported Cruise Control goals.
* @param capacityConfiguration The object containing Cruise Control capacity configuration information.
* @return A new list containing only the goals that are safe to enable given the configured resource capacities.
*/
public static Map<String, String> getCruiseControlDefaultPropertiesMap() {
return CRUISE_CONTROL_DEFAULT_PROPERTIES_MAP;
/* test */ static List<String> filterResourceGoalsWithoutCapacityConfig(List<String> goalList,
CapacityConfiguration capacityConfiguration) {
List<String> filteredGoalList = new ArrayList<>(goalList);
if (!capacityConfiguration.isInboundNetworkConfigured()) {
filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_USAGE_DISTRIBUTION_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.NETWORK_INBOUND_CAPACITY_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString());
} else {
if (!capacityConfiguration.isInboundCapacityHomogeneouslyConfigured()) {
/*
* The LeaderBytesInDistributionGoal does not account for heterogeneous inbound network capacities.
* See: https://github.com/linkedin/cruise-control/blob/main/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/LeaderBytesInDistributionGoal.java
* Therefore, if the INBOUND_NETWORK capacityConfiguration settings are not homogeneously configured,
* we remove this goal from the list.
*/
filteredGoalList.remove(CruiseControlGoals.LEADER_BYTES_IN_DISTRIBUTION_GOAL.toString());
}
}
if (!capacityConfiguration.isOutboundNetworkConfigured()) {
Comment thread
kyguy marked this conversation as resolved.
filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_USAGE_DISTRIBUTION_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.NETWORK_OUTBOUND_CAPACITY_GOAL.toString());
filteredGoalList.remove(CruiseControlGoals.POTENTIAL_NETWORK_OUTAGE_GOAL.toString());
}

return filteredGoalList;
}
Comment thread
kyguy marked this conversation as resolved.

private boolean isEnabledInConfiguration(String s1, String s2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,30 @@ private static String processResourceCapacity(BrokerCapacity brokerCapacity,
return DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND;
}
}

/**
* Checks whether this resource type has its capacity properly configured for Cruise Control goals that are
* dependent on that resource usage.
* <p>
* The configuration can come from:
* <ul>
* <li>The default capacity defined in the {@link BrokerCapacity} section of the Cruise Control spec</li>
* <li>Broker-specific overrides, where each override must define a value for this resource type</li>
* </ul>
*
* @param brokerCapacity The brokerCapacity section of the Cruise Control specification from the Kafka custom resource.
* @return {@code true} if the capacity for this resource type is considered configured, otherwise {@code false.}
*/
public static boolean isCapacityConfigured(BrokerCapacity brokerCapacity) {
if (brokerCapacity == null) {
return false;
}

if (brokerCapacity.getInboundNetwork() != null) {
return true;
}
Comment thread
kyguy marked this conversation as resolved.

return brokerCapacity.getOverrides() != null
&& brokerCapacity.getOverrides().stream().allMatch(o -> o.getInboundNetwork() != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,30 @@ private static String processResourceCapacity(BrokerCapacity brokerCapacity,
return DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND;
}
}

/**
* Checks whether this resource type has its capacity properly configured for Cruise Control goals that are
* dependent on that resource usage.
* <p>
* The configuration can come from:
* <ul>
* <li>The default capacity defined in the {@link BrokerCapacity} section of the Cruise Control spec</li>
* <li>Broker-specific overrides, where each override must define a value for this resource type</li>
* </ul>
*
* @param brokerCapacity The brokerCapacity section of the Cruise Control specification from the Kafka custom resource.
* @return {@code true} if the capacity for this resource type is considered configured, otherwise {@code false.}
*/
public static boolean isCapacityConfigured(BrokerCapacity brokerCapacity) {
if (brokerCapacity == null) {
return false;
}

if (brokerCapacity.getOutboundNetwork() != null) {
return true;
}
Comment thread
kyguy marked this conversation as resolved.

return brokerCapacity.getOverrides() != null
&& brokerCapacity.getOverrides().stream().allMatch(o -> o.getOutboundNetwork() != null);
}
}
Loading
Loading